summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAditya Naik2020-08-03 14:42:06 -0400
committerAditya Naik2020-08-03 14:42:06 -0400
commit11a52e4e88bcad1e6bddcf02f78e1cc5eaaec508 (patch)
treed7f725c30e618994a0650607ede73ab3efc72c67 /src
parent81b3f3d5336eab67e8eb21bb4011e552f3895a4e (diff)
RTOS skeleton with (UNIX) POSIX calls. Reorg of stream files
Diffstat (limited to 'src')
-rw-r--r--src/heap_4.c492
-rw-r--r--src/master_posix.c315
-rw-r--r--src/master_rtos.c147
-rw-r--r--src/stream_i2c.c37
-rw-r--r--src/stream_stdio.c18
5 files changed, 954 insertions, 55 deletions
diff --git a/src/heap_4.c b/src/heap_4.c
new file mode 100644
index 0000000..ade9e47
--- /dev/null
+++ b/src/heap_4.c
@@ -0,0 +1,492 @@
+/*
+ * FreeRTOS Kernel V10.3.1
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * http://www.FreeRTOS.org
+ * http://aws.amazon.com/freertos
+ *
+ * 1 tab == 4 spaces!
+ */
+
+/*
+ * A sample implementation of pvPortMalloc() and vPortFree() that combines
+ * (coalescences) adjacent memory blocks as they are freed, and in so doing
+ * limits memory fragmentation.
+ *
+ * See heap_1.c, heap_2.c and heap_3.c for alternative implementations, and the
+ * memory management pages of http://www.FreeRTOS.org for more information.
+ */
+#include <stdlib.h>
+
+/* Defining MPU_WRAPPERS_INCLUDED_FROM_API_FILE prevents task.h from redefining
+all the API functions to use the MPU wrappers. That should only be done when
+task.h is included from an application file. */
+#define MPU_WRAPPERS_INCLUDED_FROM_API_FILE
+
+#include "FreeRTOS.h"
+#include "task.h"
+
+#undef MPU_WRAPPERS_INCLUDED_FROM_API_FILE
+
+#if( configSUPPORT_DYNAMIC_ALLOCATION == 0 )
+ #error This file must not be used if configSUPPORT_DYNAMIC_ALLOCATION is 0
+#endif
+
+/* Block sizes must not get too small. */
+#define heapMINIMUM_BLOCK_SIZE ( ( size_t ) ( xHeapStructSize << 1 ) )
+
+/* Assumes 8bit bytes! */
+#define heapBITS_PER_BYTE ( ( size_t ) 8 )
+
+/* Allocate the memory for the heap. */
+#if( configAPPLICATION_ALLOCATED_HEAP == 1 )
+ /* The application writer has already defined the array used for the RTOS
+ heap - probably so it can be placed in a special segment or address. */
+ extern uint8_t ucHeap[ configTOTAL_HEAP_SIZE ];
+#else
+ static uint8_t ucHeap[ configTOTAL_HEAP_SIZE ];
+#endif /* configAPPLICATION_ALLOCATED_HEAP */
+
+/* Define the linked list structure. This is used to link free blocks in order
+of their memory address. */
+typedef struct A_BLOCK_LINK
+{
+ struct A_BLOCK_LINK *pxNextFreeBlock; /*<< The next free block in the list. */
+ size_t xBlockSize; /*<< The size of the free block. */
+} BlockLink_t;
+
+/*-----------------------------------------------------------*/
+
+/*
+ * Inserts a block of memory that is being freed into the correct position in
+ * the list of free memory blocks. The block being freed will be merged with
+ * the block in front it and/or the block behind it if the memory blocks are
+ * adjacent to each other.
+ */
+static void prvInsertBlockIntoFreeList( BlockLink_t *pxBlockToInsert );
+
+/*
+ * Called automatically to setup the required heap structures the first time
+ * pvPortMalloc() is called.
+ */
+static void prvHeapInit( void );
+
+/*-----------------------------------------------------------*/
+
+/* The size of the structure placed at the beginning of each allocated memory
+block must by correctly byte aligned. */
+static const size_t xHeapStructSize = ( sizeof( BlockLink_t ) + ( ( size_t ) ( portBYTE_ALIGNMENT - 1 ) ) ) & ~( ( size_t ) portBYTE_ALIGNMENT_MASK );
+
+/* Create a couple of list links to mark the start and end of the list. */
+static BlockLink_t xStart, *pxEnd = NULL;
+
+/* Keeps track of the number of calls to allocate and free memory as well as the
+number of free bytes remaining, but says nothing about fragmentation. */
+static size_t xFreeBytesRemaining = 0U;
+static size_t xMinimumEverFreeBytesRemaining = 0U;
+static size_t xNumberOfSuccessfulAllocations = 0;
+static size_t xNumberOfSuccessfulFrees = 0;
+
+/* Gets set to the top bit of an size_t type. When this bit in the xBlockSize
+member of an BlockLink_t structure is set then the block belongs to the
+application. When the bit is free the block is still part of the free heap
+space. */
+static size_t xBlockAllocatedBit = 0;
+
+/*-----------------------------------------------------------*/
+
+void *pvPortMalloc( size_t xWantedSize )
+{
+BlockLink_t *pxBlock, *pxPreviousBlock, *pxNewBlockLink;
+void *pvReturn = NULL;
+
+ vTaskSuspendAll();
+ {
+ /* If this is the first call to malloc then the heap will require
+ initialisation to setup the list of free blocks. */
+ if( pxEnd == NULL )
+ {
+ prvHeapInit();
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+
+ /* Check the requested block size is not so large that the top bit is
+ set. The top bit of the block size member of the BlockLink_t structure
+ is used to determine who owns the block - the application or the
+ kernel, so it must be free. */
+ if( ( xWantedSize & xBlockAllocatedBit ) == 0 )
+ {
+ /* The wanted size is increased so it can contain a BlockLink_t
+ structure in addition to the requested amount of bytes. */
+ if( xWantedSize > 0 )
+ {
+ xWantedSize += xHeapStructSize;
+
+ /* Ensure that blocks are always aligned to the required number
+ of bytes. */
+ if( ( xWantedSize & portBYTE_ALIGNMENT_MASK ) != 0x00 )
+ {
+ /* Byte alignment required. */
+ xWantedSize += ( portBYTE_ALIGNMENT - ( xWantedSize & portBYTE_ALIGNMENT_MASK ) );
+ configASSERT( ( xWantedSize & portBYTE_ALIGNMENT_MASK ) == 0 );
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+
+ if( ( xWantedSize > 0 ) && ( xWantedSize <= xFreeBytesRemaining ) )
+ {
+ /* Traverse the list from the start (lowest address) block until
+ one of adequate size is found. */
+ pxPreviousBlock = &xStart;
+ pxBlock = xStart.pxNextFreeBlock;
+ while( ( pxBlock->xBlockSize < xWantedSize ) && ( pxBlock->pxNextFreeBlock != NULL ) )
+ {
+ pxPreviousBlock = pxBlock;
+ pxBlock = pxBlock->pxNextFreeBlock;
+ }
+
+ /* If the end marker was reached then a block of adequate size
+ was not found. */
+ if( pxBlock != pxEnd )
+ {
+ /* Return the memory space pointed to - jumping over the
+ BlockLink_t structure at its start. */
+ pvReturn = ( void * ) ( ( ( uint8_t * ) pxPreviousBlock->pxNextFreeBlock ) + xHeapStructSize );
+
+ /* This block is being returned for use so must be taken out
+ of the list of free blocks. */
+ pxPreviousBlock->pxNextFreeBlock = pxBlock->pxNextFreeBlock;
+
+ /* If the block is larger than required it can be split into
+ two. */
+ if( ( pxBlock->xBlockSize - xWantedSize ) > heapMINIMUM_BLOCK_SIZE )
+ {
+ /* This block is to be split into two. Create a new
+ block following the number of bytes requested. The void
+ cast is used to prevent byte alignment warnings from the
+ compiler. */
+ pxNewBlockLink = ( void * ) ( ( ( uint8_t * ) pxBlock ) + xWantedSize );
+ configASSERT( ( ( ( size_t ) pxNewBlockLink ) & portBYTE_ALIGNMENT_MASK ) == 0 );
+
+ /* Calculate the sizes of two blocks split from the
+ single block. */
+ pxNewBlockLink->xBlockSize = pxBlock->xBlockSize - xWantedSize;
+ pxBlock->xBlockSize = xWantedSize;
+
+ /* Insert the new block into the list of free blocks. */
+ prvInsertBlockIntoFreeList( pxNewBlockLink );
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+
+ xFreeBytesRemaining -= pxBlock->xBlockSize;
+
+ if( xFreeBytesRemaining < xMinimumEverFreeBytesRemaining )
+ {
+ xMinimumEverFreeBytesRemaining = xFreeBytesRemaining;
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+
+ /* The block is being returned - it is allocated and owned
+ by the application and has no "next" block. */
+ pxBlock->xBlockSize |= xBlockAllocatedBit;
+ pxBlock->pxNextFreeBlock = NULL;
+ xNumberOfSuccessfulAllocations++;
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+
+ traceMALLOC( pvReturn, xWantedSize );
+ }
+ ( void ) xTaskResumeAll();
+
+ #if( configUSE_MALLOC_FAILED_HOOK == 1 )
+ {
+ if( pvReturn == NULL )
+ {
+ extern void vApplicationMallocFailedHook( void );
+ vApplicationMallocFailedHook();
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+ }
+ #endif
+
+ configASSERT( ( ( ( size_t ) pvReturn ) & ( size_t ) portBYTE_ALIGNMENT_MASK ) == 0 );
+ return pvReturn;
+}
+/*-----------------------------------------------------------*/
+
+void vPortFree( void *pv )
+{
+uint8_t *puc = ( uint8_t * ) pv;
+BlockLink_t *pxLink;
+
+ if( pv != NULL )
+ {
+ /* The memory being freed will have an BlockLink_t structure immediately
+ before it. */
+ puc -= xHeapStructSize;
+
+ /* This casting is to keep the compiler from issuing warnings. */
+ pxLink = ( void * ) puc;
+
+ /* Check the block is actually allocated. */
+ configASSERT( ( pxLink->xBlockSize & xBlockAllocatedBit ) != 0 );
+ configASSERT( pxLink->pxNextFreeBlock == NULL );
+
+ if( ( pxLink->xBlockSize & xBlockAllocatedBit ) != 0 )
+ {
+ if( pxLink->pxNextFreeBlock == NULL )
+ {
+ /* The block is being returned to the heap - it is no longer
+ allocated. */
+ pxLink->xBlockSize &= ~xBlockAllocatedBit;
+
+ vTaskSuspendAll();
+ {
+ /* Add this block to the list of free blocks. */
+ xFreeBytesRemaining += pxLink->xBlockSize;
+ traceFREE( pv, pxLink->xBlockSize );
+ prvInsertBlockIntoFreeList( ( ( BlockLink_t * ) pxLink ) );
+ xNumberOfSuccessfulFrees++;
+ }
+ ( void ) xTaskResumeAll();
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+ }
+}
+/*-----------------------------------------------------------*/
+
+size_t xPortGetFreeHeapSize( void )
+{
+ return xFreeBytesRemaining;
+}
+/*-----------------------------------------------------------*/
+
+size_t xPortGetMinimumEverFreeHeapSize( void )
+{
+ return xMinimumEverFreeBytesRemaining;
+}
+/*-----------------------------------------------------------*/
+
+void vPortInitialiseBlocks( void )
+{
+ /* This just exists to keep the linker quiet. */
+}
+/*-----------------------------------------------------------*/
+
+static void prvHeapInit( void )
+{
+BlockLink_t *pxFirstFreeBlock;
+uint8_t *pucAlignedHeap;
+size_t uxAddress;
+size_t xTotalHeapSize = configTOTAL_HEAP_SIZE;
+
+ /* Ensure the heap starts on a correctly aligned boundary. */
+ uxAddress = ( size_t ) ucHeap;
+
+ if( ( uxAddress & portBYTE_ALIGNMENT_MASK ) != 0 )
+ {
+ uxAddress += ( portBYTE_ALIGNMENT - 1 );
+ uxAddress &= ~( ( size_t ) portBYTE_ALIGNMENT_MASK );
+ xTotalHeapSize -= uxAddress - ( size_t ) ucHeap;
+ }
+
+ pucAlignedHeap = ( uint8_t * ) uxAddress;
+
+ /* xStart is used to hold a pointer to the first item in the list of free
+ blocks. The void cast is used to prevent compiler warnings. */
+ xStart.pxNextFreeBlock = ( void * ) pucAlignedHeap;
+ xStart.xBlockSize = ( size_t ) 0;
+
+ /* pxEnd is used to mark the end of the list of free blocks and is inserted
+ at the end of the heap space. */
+ uxAddress = ( ( size_t ) pucAlignedHeap ) + xTotalHeapSize;
+ uxAddress -= xHeapStructSize;
+ uxAddress &= ~( ( size_t ) portBYTE_ALIGNMENT_MASK );
+ pxEnd = ( void * ) uxAddress;
+ pxEnd->xBlockSize = 0;
+ pxEnd->pxNextFreeBlock = NULL;
+
+ /* To start with there is a single free block that is sized to take up the
+ entire heap space, minus the space taken by pxEnd. */
+ pxFirstFreeBlock = ( void * ) pucAlignedHeap;
+ pxFirstFreeBlock->xBlockSize = uxAddress - ( size_t ) pxFirstFreeBlock;
+ pxFirstFreeBlock->pxNextFreeBlock = pxEnd;
+
+ /* Only one block exists - and it covers the entire usable heap space. */
+ xMinimumEverFreeBytesRemaining = pxFirstFreeBlock->xBlockSize;
+ xFreeBytesRemaining = pxFirstFreeBlock->xBlockSize;
+
+ /* Work out the position of the top bit in a size_t variable. */
+ xBlockAllocatedBit = ( ( size_t ) 1 ) << ( ( sizeof( size_t ) * heapBITS_PER_BYTE ) - 1 );
+}
+/*-----------------------------------------------------------*/
+
+static void prvInsertBlockIntoFreeList( BlockLink_t *pxBlockToInsert )
+{
+BlockLink_t *pxIterator;
+uint8_t *puc;
+
+ /* Iterate through the list until a block is found that has a higher address
+ than the block being inserted. */
+ for( pxIterator = &xStart; pxIterator->pxNextFreeBlock < pxBlockToInsert; pxIterator = pxIterator->pxNextFreeBlock )
+ {
+ /* Nothing to do here, just iterate to the right position. */
+ }
+
+ /* Do the block being inserted, and the block it is being inserted after
+ make a contiguous block of memory? */
+ puc = ( uint8_t * ) pxIterator;
+ if( ( puc + pxIterator->xBlockSize ) == ( uint8_t * ) pxBlockToInsert )
+ {
+ pxIterator->xBlockSize += pxBlockToInsert->xBlockSize;
+ pxBlockToInsert = pxIterator;
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+
+ /* Do the block being inserted, and the block it is being inserted before
+ make a contiguous block of memory? */
+ puc = ( uint8_t * ) pxBlockToInsert;
+ if( ( puc + pxBlockToInsert->xBlockSize ) == ( uint8_t * ) pxIterator->pxNextFreeBlock )
+ {
+ if( pxIterator->pxNextFreeBlock != pxEnd )
+ {
+ /* Form one big block from the two blocks. */
+ pxBlockToInsert->xBlockSize += pxIterator->pxNextFreeBlock->xBlockSize;
+ pxBlockToInsert->pxNextFreeBlock = pxIterator->pxNextFreeBlock->pxNextFreeBlock;
+ }
+ else
+ {
+ pxBlockToInsert->pxNextFreeBlock = pxEnd;
+ }
+ }
+ else
+ {
+ pxBlockToInsert->pxNextFreeBlock = pxIterator->pxNextFreeBlock;
+ }
+
+ /* If the block being inserted plugged a gab, so was merged with the block
+ before and the block after, then it's pxNextFreeBlock pointer will have
+ already been set, and should not be set here as that would make it point
+ to itself. */
+ if( pxIterator != pxBlockToInsert )
+ {
+ pxIterator->pxNextFreeBlock = pxBlockToInsert;
+ }
+ else
+ {
+ mtCOVERAGE_TEST_MARKER();
+ }
+}
+/*-----------------------------------------------------------*/
+
+void vPortGetHeapStats( HeapStats_t *pxHeapStats )
+{
+BlockLink_t *pxBlock;
+size_t xBlocks = 0, xMaxSize = 0, xMinSize = portMAX_DELAY; /* portMAX_DELAY used as a portable way of getting the maximum value. */
+
+ vTaskSuspendAll();
+ {
+ pxBlock = xStart.pxNextFreeBlock;
+
+ /* pxBlock will be NULL if the heap has not been initialised. The heap
+ is initialised automatically when the first allocation is made. */
+ if( pxBlock != NULL )
+ {
+ do
+ {
+ /* Increment the number of blocks and record the largest block seen
+ so far. */
+ xBlocks++;
+
+ if( pxBlock->xBlockSize > xMaxSize )
+ {
+ xMaxSize = pxBlock->xBlockSize;
+ }
+
+ if( pxBlock->xBlockSize < xMinSize )
+ {
+ xMinSize = pxBlock->xBlockSize;
+ }
+
+ /* Move to the next block in the chain until the last block is
+ reached. */
+ pxBlock = pxBlock->pxNextFreeBlock;
+ } while( pxBlock != pxEnd );
+ }
+ }
+ ( void ) xTaskResumeAll();
+
+ pxHeapStats->xSizeOfLargestFreeBlockInBytes = xMaxSize;
+ pxHeapStats->xSizeOfSmallestFreeBlockInBytes = xMinSize;
+ pxHeapStats->xNumberOfFreeBlocks = xBlocks;
+
+ taskENTER_CRITICAL();
+ {
+ pxHeapStats->xAvailableHeapSpaceInBytes = xFreeBytesRemaining;
+ pxHeapStats->xNumberOfSuccessfulAllocations = xNumberOfSuccessfulAllocations;
+ pxHeapStats->xNumberOfSuccessfulFrees = xNumberOfSuccessfulFrees;
+ pxHeapStats->xMinimumEverFreeBytesRemaining = xMinimumEverFreeBytesRemaining;
+ }
+ taskEXIT_CRITICAL();
+}
+
diff --git a/src/master_posix.c b/src/master_posix.c
new file mode 100644
index 0000000..355960d
--- /dev/null
+++ b/src/master_posix.c
@@ -0,0 +1,315 @@
+
+/*
+ * TODO Do conditional includes based on which target we are building for.
+ * This should be specified in the master config file
+ * TODO This file should be moved to the posix port folder
+*/
+
+/* FreeRTOS includes. */
+/* #include "FreeRTOS_POSIX.h" */
+
+/* System headers. */
+#include <stdbool.h>
+#include <string.h>
+#include <stdio.h>
+
+/* Demo includes. */
+/* #include "posix_demo.h" */
+
+/* FreeRTOS+POSIX. */
+/* #include "FreeRTOS_POSIX/pthread.h" */
+/* #include "FreeRTOS_POSIX/mqueue.h" */
+/* #include "FreeRTOS_POSIX/time.h" */
+/* #include "FreeRTOS_POSIX/fcntl.h" */
+/* #include "FreeRTOS_POSIX/errno.h" */
+
+#include <pthread.h>
+#include <mqueue.h>
+#include <stdint.h>
+#include <errno.h>
+/* Constants. */
+#define LINE_BREAK "\r\n"
+
+/**
+ * @brief Control messages.
+ *
+ * uint8_t is sufficient for this enum, that we are going to cast to char directly.
+ * If ever needed, implement a function to properly typecast.
+ */
+/**@{ */
+typedef enum ControlMessage
+{
+ eMSG_LOWER_INAVLID = 0x00, /**< Guard, let's not use 0x00 for messages. */
+ eWORKER_CTRL_MSG_CONTINUE = 0x01, /**< Dispatcher to worker, distributing another job. */
+ eWORKER_CTRL_MSG_EXIT = 0x02, /**< Dispatcher to worker, all jobs are finished and the worker receiving such can exit. */
+
+ /* define additional messages here */
+
+ eMSG_UPPER_INVALID = 0xFF /**< Guard, additional tasks shall be defined above. */
+} eControlMessage;
+/**@} */
+
+/**
+ * @defgroup Configuration constants for the dispatcher-worker demo.
+ */
+/**@{ */
+#define MQUEUE_NUMBER_OF_WORKERS ( 2 ) /**< The number of worker threads, each thread has one queue which is used as income box. */
+
+#if ( MQUEUE_NUMBER_OF_WORKERS > 10 )
+#error "Please keep MQUEUE_NUMBER_OF_WORKERS < 10."
+#endif
+
+#define MQUEUE_WORKER_QNAME_BASE "/qNode0" /**< Queue name base. */
+#define MQUEUE_WORKER_QNAME_BASE_LEN ( 6 ) /** Queue name base length. */
+
+#define MQUEUE_TIMEOUT_SECONDS ( 1 ) /**< Relative timeout for mqueue functions. */
+#define MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER ( 1 ) /**< Maximum number of messages in a queue. */
+
+#define MQUEUE_MSG_WORKER_CTRL_MSG_SIZE sizeof( uint8_t ) /**< Control message size. */
+#define DEMO_ERROR ( -1 ) /**< Any non-zero value would work. */
+/**@} */
+
+/**
+ * @brief Structure used by Worker thread.
+ */
+/**@{ */
+typedef struct WorkerThreadResources
+{
+ pthread_t pxID; /**< thread ID. */
+ mqd_t xInboxID; /**< mqueue inbox ID. */
+} WorkerThreadResources_t;
+/**@} */
+
+/**
+ * @brief Structure used by Dispatcher thread.
+ */
+/**@{ */
+typedef struct DispatcherThreadResources
+{
+ pthread_t pxID; /**< thread ID. */
+ mqd_t * pOutboxID; /**< a list of mqueue outbox ID. */
+} DispatcherThreadResources_t;
+/**@} */
+
+/*-----------------------------------------------------------*/
+
+static void * prvWorkerThread( void * pvArgs )
+{
+ WorkerThreadResources_t pArgList = *( WorkerThreadResources_t * ) pvArgs;
+
+ printf( "Worker thread #[%d] - start %s", ( int ) pArgList.pxID, LINE_BREAK );
+
+ struct timespec xReceiveTimeout = { 0 };
+
+ ssize_t xMessageSize = 0;
+ char pcReceiveBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 };
+
+ /* This is a worker thread that reacts based on what is sent to its inbox (mqueue). */
+ while( true )
+ {
+ clock_gettime( CLOCK_REALTIME, &xReceiveTimeout );
+ xReceiveTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;
+
+ xMessageSize = mq_receive( pArgList.xInboxID,
+ pcReceiveBuffer,
+ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ 0 );
+
+ /* Parse messages */
+ if( xMessageSize == MQUEUE_MSG_WORKER_CTRL_MSG_SIZE )
+ {
+ switch( ( int ) pcReceiveBuffer[ 0 ] )
+ {
+ case eWORKER_CTRL_MSG_CONTINUE:
+ /* Task branch, currently only prints message to screen. */
+ /* Could perform tasks here. Could also notify dispatcher upon completion, if desired. */
+ printf( "Worker thread #[%d] -- Received eWORKER_CTRL_MSG_CONTINUE %s", ( int ) pArgList.pxID, LINE_BREAK );
+ break;
+
+ case eWORKER_CTRL_MSG_EXIT:
+ printf( "Worker thread #[%d] -- Finished. Exit now. %s", ( int ) pArgList.pxID, LINE_BREAK );
+
+ return NULL;
+
+ default:
+ /* Received a message that we don't care or not defined. */
+ break;
+ }
+ }
+ else
+ {
+ /* Invalid message. Error handling can be done here, if desired. */
+ }
+ }
+
+ /* You should never hit here. */
+ /* return NULL; */
+}
+
+/*-----------------------------------------------------------*/
+
+static void * prvDispatcherThread( void * pvArgs )
+{
+ DispatcherThreadResources_t pArgList = *( DispatcherThreadResources_t * ) pvArgs;
+
+ printf( "Dispatcher thread - start %s", LINE_BREAK );
+
+ struct timespec xSendTimeout = { 0 };
+
+ ssize_t xMessageSize = 0;
+ char pcSendBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 };
+
+ /* Just for fun, let threads do a total of 100 independent tasks. */
+ int i = 0;
+ const int totalNumOfJobsPerThread = 100;
+
+ /* Distribute 1000 independent tasks to workers, in round-robin fashion. */
+ pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_CONTINUE;
+
+ for( i = 0; i < totalNumOfJobsPerThread; i++ )
+ {
+ clock_gettime( CLOCK_REALTIME, &xSendTimeout );
+ xSendTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;
+
+ printf( "Dispatcher iteration #[%d] -- Sending msg to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK );
+
+ xMessageSize = mq_timedsend( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ],
+ pcSendBuffer,
+ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ 0,
+ &xSendTimeout );
+
+ if( xMessageSize != 0 )
+ {
+ /* This error is acceptable in our setup.
+ * Since inbox for each thread fits only one message.
+ * In reality, balance inbox size, message arrival rate, and message drop rate. */
+ printf( "An acceptable failure -- dispatcher failed to send eWORKER_CTRL_MSG_CONTINUE to outbox ID: %x. errno %d %s",
+ ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], errno, LINE_BREAK );
+ }
+ }
+
+ /* Control thread is now done with distributing jobs. Tell workers they are done. */
+ pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_EXIT;
+
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ printf( "Dispatcher [%d] -- Sending eWORKER_CTRL_MSG_EXIT to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK );
+
+ /* This is a blocking call, to guarantee worker thread exits. */
+ xMessageSize = mq_send( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ],
+ pcSendBuffer,
+ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ 0 );
+ }
+
+ return NULL;
+}
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Job distribution with actor model.
+ *
+ * See the top of this file for detailed description.
+ */
+void vStartPOSIXMaster()
+{
+ int i = 0;
+ int iStatus = 0;
+
+ /* Remove warnings about unused parameters. */
+
+ /* Handles of the threads and related resources. */
+ DispatcherThreadResources_t pxDispatcher = { 0 };
+ WorkerThreadResources_t pxWorkers[ MQUEUE_NUMBER_OF_WORKERS ] = { { 0 } };
+ mqd_t workerMqueues[ MQUEUE_NUMBER_OF_WORKERS ] = { 0 };
+
+ struct mq_attr xQueueAttributesWorker =
+ {
+ .mq_flags = 0,
+ .mq_maxmsg = MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER,
+ .mq_msgsize = MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ .mq_curmsgs = 0
+ };
+
+ pxDispatcher.pOutboxID = workerMqueues;
+
+ /* Create message queues for each worker thread. */
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ /* Prepare a unique queue name for each worker. */
+ char qName[] = MQUEUE_WORKER_QNAME_BASE;
+ qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i;
+
+ /* Open a queue with --
+ * O_CREAT -- create a message queue.
+ * O_RDWR -- both receiving and sending messages.
+ */
+ pxWorkers[ i ].xInboxID = mq_open( qName,
+ O_CREAT | O_RDWR,
+ ( mode_t ) 0,
+ &xQueueAttributesWorker );
+
+ if( pxWorkers[ i ].xInboxID == ( mqd_t ) -1 )
+ {
+ printf( "Invalid inbox (mqueue) for worker. %s", LINE_BREAK );
+ iStatus = DEMO_ERROR;
+ break;
+ }
+
+ /* Outboxes of dispatcher thread is the inboxes of all worker threads. */
+ pxDispatcher.pOutboxID[ i ] = pxWorkers[ i ].xInboxID;
+ }
+
+ /* Create and start Worker threads. */
+ if( iStatus == 0 )
+ {
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ ( void ) pthread_create( &( pxWorkers[ i ].pxID ), NULL, prvWorkerThread, &pxWorkers[ i ] );
+ }
+
+ /* Create and start dispatcher thread. */
+ ( void ) pthread_create( &( pxDispatcher.pxID ), NULL, prvDispatcherThread, &pxDispatcher );
+
+ /* Actors will do predefined tasks in threads. Current implementation is that
+ * dispatcher actor notifies worker actors to terminate upon finishing distributing tasks. */
+
+ /* Wait for worker threads to join. */
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ ( void ) pthread_join( pxWorkers[ i ].pxID, NULL );
+ }
+
+ /* Wait for dispatcher thread to join. */
+ ( void ) pthread_join( pxDispatcher.pxID, NULL );
+ }
+
+ /* Close and unlink worker message queues. */
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ char qName[] = MQUEUE_WORKER_QNAME_BASE;
+ qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i;
+
+ if( pxWorkers[ i ].xInboxID != NULL )
+ {
+ ( void ) mq_close( pxWorkers[ i ].xInboxID );
+ ( void ) mq_unlink( qName );
+ }
+ }
+
+ /* Have something on console. */
+ if( iStatus == 0 )
+ {
+ printf( "All threads finished. %s", LINE_BREAK );
+ }
+ else
+ {
+ printf( "Queues did not get initialized properly. Did not run demo. %s", LINE_BREAK );
+ }
+
+ /* This task was created with the native xTaskCreate() API function, so
+ must not run off the end of its implementing thread. */
+ /* vTaskDelete( NULL ); */
+}
diff --git a/src/master_rtos.c b/src/master_rtos.c
new file mode 100644
index 0000000..4ca3461
--- /dev/null
+++ b/src/master_rtos.c
@@ -0,0 +1,147 @@
+/* FreeRTOS includes. */
+#include "FreeRTOS.h"
+#include "task.h"
+
+/* System headers */
+#include <stdio.h>
+
+/* Master include */
+#include "master_posix.h"
+
+/* Master task priority */
+#define mainPOSIX_MASTER_PRIORITY (tskIDLE_PRIORITY + 4)
+
+int main(void)
+{
+ configASSERT((mainPOSIX_MASTER_PRIORITY < configMAX_PRIORITIES));
+
+ /* const uint32_t ulLongTime_ms = pdMS_TO_TICKS(1000UL); */
+
+ /* Start the task to run POSIX Master */
+ xTaskCreate(vStartPOSIXMaster,
+ "posix",
+ configMINIMAL_STACK_SIZE,
+ NULL,
+ mainPOSIX_MASTER_PRIORITY,
+ NULL);
+
+ vTaskStartScheduler();
+
+ /* If all is well, the scheduler will now be running, and the following
+ * line will never be reached. If the following line does execute, then
+ * there was insufficient FreeRTOS heap memory available for the idle and/or
+ * timer tasks to be created.
+ */
+ for(; ;)
+ {
+ /* Sleep(ulLongTime_ms); */
+ }
+
+ return 0;
+}
+
+
+/*-----------------------------------------------------------*/
+
+void vAssertCalled(const char * pcFile,
+ uint32_t ulLine)
+{
+ /* const uint32_t ulLongSleep = 1000UL; */
+ volatile uint32_t ulBlockVariable = 0UL;
+ volatile char * pcFileName = ( volatile char * ) pcFile;
+ volatile uint32_t ulLineNumber = ulLine;
+
+ (void) pcFileName;
+ (void) ulLineNumber;
+
+ printf("vAssertCalled %s, %ld\n", pcFile, (long) ulLine);
+ fflush(stdout);
+
+ /* Setting ulBlockVariable to a non-zero value in the debugger will allow
+ * this function to be exited. */
+ taskDISABLE_INTERRUPTS();
+ {
+ while(ulBlockVariable == 0UL)
+ {
+ /* Sleep(ulLongSleep); */
+ }
+ }
+ taskENABLE_INTERRUPTS();
+}
+
+/*-----------------------------------------------------------*/
+
+/* configUSE_STATIC_ALLOCATION is set to 1, so the application must provide an
+ * implementation of vApplicationGetIdleTaskMemory() to provide the memory that is
+ * used by the Idle task. */
+void vApplicationGetIdleTaskMemory(StaticTask_t ** ppxIdleTaskTCBBuffer,
+ StackType_t ** ppxIdleTaskStackBuffer,
+ uint32_t * pulIdleTaskStackSize)
+{
+ /* If the buffers to be provided to the Idle task are declared inside this
+ * function then they must be declared static - otherwise they will be allocated on
+ * the stack and so not exists after this function exits. */
+ static StaticTask_t xIdleTaskTCB;
+ static StackType_t uxIdleTaskStack[ configMINIMAL_STACK_SIZE ];
+
+ /* Pass out a pointer to the StaticTask_t structure in which the Idle
+ * task's state will be stored. */
+ *ppxIdleTaskTCBBuffer = &xIdleTaskTCB;
+
+ /* Pass out the array that will be used as the Idle task's stack. */
+ *ppxIdleTaskStackBuffer = uxIdleTaskStack;
+
+ /* Pass out the size of the array pointed to by *ppxIdleTaskStackBuffer.
+ * Note that, as the array is necessarily of type StackType_t,
+ * configMINIMAL_STACK_SIZE is specified in words, not bytes. */
+ *pulIdleTaskStackSize = configMINIMAL_STACK_SIZE;
+}
+
+/*-----------------------------------------------------------*/
+
+/* configUSE_STATIC_ALLOCATION and configUSE_TIMERS are both set to 1, so the
+ * application must provide an implementation of vApplicationGetTimerTaskMemory()
+ * to provide the memory that is used by the Timer service task. */
+void vApplicationGetTimerTaskMemory(StaticTask_t ** ppxTimerTaskTCBBuffer,
+ StackType_t ** ppxTimerTaskStackBuffer,
+ uint32_t * pulTimerTaskStackSize)
+{
+ /* If the buffers to be provided to the Timer task are declared inside this
+ * function then they must be declared static - otherwise they will be allocated on
+ * the stack and so not exists after this function exits. */
+ static StaticTask_t xTimerTaskTCB;
+ static StackType_t uxTimerTaskStack[ configTIMER_TASK_STACK_DEPTH ];
+
+ /* Pass out a pointer to the StaticTask_t structure in which the Timer
+ * task's state will be stored. */
+ *ppxTimerTaskTCBBuffer = &xTimerTaskTCB;
+
+ /* Pass out the array that will be used as the Timer task's stack. */
+ *ppxTimerTaskStackBuffer = uxTimerTaskStack;
+
+ /* Pass out the size of the array pointed to by *ppxTimerTaskStackBuffer.
+ * Note that, as the array is necessarily of type StackType_t,
+ * configTIMER_TASK_STACK_DEPTH is specified in words, not bytes. */
+ *pulTimerTaskStackSize = configTIMER_TASK_STACK_DEPTH;
+}
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Warn user if pvPortMalloc fails.
+ *
+ * Called if a call to pvPortMalloc() fails because there is insufficient
+ * free memory available in the FreeRTOS heap. pvPortMalloc() is called
+ * internally by FreeRTOS API functions that create tasks, queues, software
+ * timers, and semaphores. The size of the FreeRTOS heap is set by the
+ * configTOTAL_HEAP_SIZE configuration constant in FreeRTOSConfig.h.
+ *
+ */
+void vApplicationMallocFailedHook()
+{
+ taskDISABLE_INTERRUPTS();
+
+ for(;;)
+ {
+ }
+}
diff --git a/src/stream_i2c.c b/src/stream_i2c.c
deleted file mode 100644
index b149756..0000000
--- a/src/stream_i2c.c
+++ /dev/null
@@ -1,37 +0,0 @@
-#include "stream_i2c.h"
-#include "port.h"
-#include "stream.h"
-
-int i2c_read(uint8_t* buf, size_t count, void **vptr, void *sptr)
-{
- p_stream_t *stream = (p_stream_t*)sptr;
- I2C_HandleTypeDef dev = *(I2C_HandleTypeDef*)stream->props[DEVICE];
-
- uint16_t addr = *(uint16_t*)vptr[0];
- uint32_t timeout = *(uint32_t*)vptr[1];
- uint16_t AF_limit = *(uint32_t*)vptr[2];
-
- int error, AF_counter = 0;
- while (HAL_I2C_Master_Receive(&dev, addr, buf, count, timeout) != HAL_OK) {
- if ((error = HAL_I2C_GetError(&dev)) != HAL_I2C_ERROR_AF) {
- return error;
- }
- else if (++AF_counter > AF_limit) {
- return HAL_I2C_ERROR_AF;
- }
- }
- return 0;
-}
-
-int i2c_write(uint8_t* buf, size_t count, void **vptr, void *sptr)
-{
- p_stream_t *stream = (p_stream_t*)sptr;
- I2C_HandleTypeDef dev = *(I2C_HandleTypeDef*)stream->props[DEVICE];
- uint16_t addr = *(uint16_t*)vptr[1];
- uint32_t timeout = *(uint32_t*)vptr[2];
-
- while (HAL_I2C_Master_Transmit(&dev, addr, buf, count, timeout) != HAL_OK) {
- return HAL_I2C_GetError(&dev);
- }
- return 0;
-}
diff --git a/src/stream_stdio.c b/src/stream_stdio.c
deleted file mode 100644
index efa4b13..0000000
--- a/src/stream_stdio.c
+++ /dev/null
@@ -1,18 +0,0 @@
-#include <stdio.h>
-#include "stream_stdio.h"
-#include "stream.h"
-
-int stdio_read(uint8_t* buf, size_t count, void **vptr, void *sptr)
-{
- int x;
- for (x = 0; x < count; x++) {
- scanf("%c", &buf[x]);
- }
- return 0;
-}
-
-int stdio_write(uint8_t* buf, size_t count, void **vptr, void *sptr)
-{
- printf("%s\n", buf);
- return 0;
-}