From 11a52e4e88bcad1e6bddcf02f78e1cc5eaaec508 Mon Sep 17 00:00:00 2001 From: Aditya Naik Date: Mon, 3 Aug 2020 14:42:06 -0400 Subject: RTOS skeleton with (UNIX) POSIX calls. Reorg of stream files --- src/heap_4.c | 492 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/master_posix.c | 315 ++++++++++++++++++++++++++++++++++ src/master_rtos.c | 147 ++++++++++++++++ src/stream_i2c.c | 37 ---- src/stream_stdio.c | 18 -- 5 files changed, 954 insertions(+), 55 deletions(-) create mode 100644 src/heap_4.c create mode 100644 src/master_posix.c create mode 100644 src/master_rtos.c delete mode 100644 src/stream_i2c.c delete mode 100644 src/stream_stdio.c (limited to 'src') diff --git a/src/heap_4.c b/src/heap_4.c new file mode 100644 index 0000000..ade9e47 --- /dev/null +++ b/src/heap_4.c @@ -0,0 +1,492 @@ +/* + * FreeRTOS Kernel V10.3.1 + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * http://www.FreeRTOS.org + * http://aws.amazon.com/freertos + * + * 1 tab == 4 spaces! + */ + +/* + * A sample implementation of pvPortMalloc() and vPortFree() that combines + * (coalescences) adjacent memory blocks as they are freed, and in so doing + * limits memory fragmentation. + * + * See heap_1.c, heap_2.c and heap_3.c for alternative implementations, and the + * memory management pages of http://www.FreeRTOS.org for more information. + */ +#include + +/* Defining MPU_WRAPPERS_INCLUDED_FROM_API_FILE prevents task.h from redefining +all the API functions to use the MPU wrappers. That should only be done when +task.h is included from an application file. */ +#define MPU_WRAPPERS_INCLUDED_FROM_API_FILE + +#include "FreeRTOS.h" +#include "task.h" + +#undef MPU_WRAPPERS_INCLUDED_FROM_API_FILE + +#if( configSUPPORT_DYNAMIC_ALLOCATION == 0 ) + #error This file must not be used if configSUPPORT_DYNAMIC_ALLOCATION is 0 +#endif + +/* Block sizes must not get too small. */ +#define heapMINIMUM_BLOCK_SIZE ( ( size_t ) ( xHeapStructSize << 1 ) ) + +/* Assumes 8bit bytes! */ +#define heapBITS_PER_BYTE ( ( size_t ) 8 ) + +/* Allocate the memory for the heap. */ +#if( configAPPLICATION_ALLOCATED_HEAP == 1 ) + /* The application writer has already defined the array used for the RTOS + heap - probably so it can be placed in a special segment or address. */ + extern uint8_t ucHeap[ configTOTAL_HEAP_SIZE ]; +#else + static uint8_t ucHeap[ configTOTAL_HEAP_SIZE ]; +#endif /* configAPPLICATION_ALLOCATED_HEAP */ + +/* Define the linked list structure. This is used to link free blocks in order +of their memory address. */ +typedef struct A_BLOCK_LINK +{ + struct A_BLOCK_LINK *pxNextFreeBlock; /*<< The next free block in the list. */ + size_t xBlockSize; /*<< The size of the free block. */ +} BlockLink_t; + +/*-----------------------------------------------------------*/ + +/* + * Inserts a block of memory that is being freed into the correct position in + * the list of free memory blocks. The block being freed will be merged with + * the block in front it and/or the block behind it if the memory blocks are + * adjacent to each other. + */ +static void prvInsertBlockIntoFreeList( BlockLink_t *pxBlockToInsert ); + +/* + * Called automatically to setup the required heap structures the first time + * pvPortMalloc() is called. + */ +static void prvHeapInit( void ); + +/*-----------------------------------------------------------*/ + +/* The size of the structure placed at the beginning of each allocated memory +block must by correctly byte aligned. */ +static const size_t xHeapStructSize = ( sizeof( BlockLink_t ) + ( ( size_t ) ( portBYTE_ALIGNMENT - 1 ) ) ) & ~( ( size_t ) portBYTE_ALIGNMENT_MASK ); + +/* Create a couple of list links to mark the start and end of the list. */ +static BlockLink_t xStart, *pxEnd = NULL; + +/* Keeps track of the number of calls to allocate and free memory as well as the +number of free bytes remaining, but says nothing about fragmentation. */ +static size_t xFreeBytesRemaining = 0U; +static size_t xMinimumEverFreeBytesRemaining = 0U; +static size_t xNumberOfSuccessfulAllocations = 0; +static size_t xNumberOfSuccessfulFrees = 0; + +/* Gets set to the top bit of an size_t type. When this bit in the xBlockSize +member of an BlockLink_t structure is set then the block belongs to the +application. When the bit is free the block is still part of the free heap +space. */ +static size_t xBlockAllocatedBit = 0; + +/*-----------------------------------------------------------*/ + +void *pvPortMalloc( size_t xWantedSize ) +{ +BlockLink_t *pxBlock, *pxPreviousBlock, *pxNewBlockLink; +void *pvReturn = NULL; + + vTaskSuspendAll(); + { + /* If this is the first call to malloc then the heap will require + initialisation to setup the list of free blocks. */ + if( pxEnd == NULL ) + { + prvHeapInit(); + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + + /* Check the requested block size is not so large that the top bit is + set. The top bit of the block size member of the BlockLink_t structure + is used to determine who owns the block - the application or the + kernel, so it must be free. */ + if( ( xWantedSize & xBlockAllocatedBit ) == 0 ) + { + /* The wanted size is increased so it can contain a BlockLink_t + structure in addition to the requested amount of bytes. */ + if( xWantedSize > 0 ) + { + xWantedSize += xHeapStructSize; + + /* Ensure that blocks are always aligned to the required number + of bytes. */ + if( ( xWantedSize & portBYTE_ALIGNMENT_MASK ) != 0x00 ) + { + /* Byte alignment required. */ + xWantedSize += ( portBYTE_ALIGNMENT - ( xWantedSize & portBYTE_ALIGNMENT_MASK ) ); + configASSERT( ( xWantedSize & portBYTE_ALIGNMENT_MASK ) == 0 ); + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + + if( ( xWantedSize > 0 ) && ( xWantedSize <= xFreeBytesRemaining ) ) + { + /* Traverse the list from the start (lowest address) block until + one of adequate size is found. */ + pxPreviousBlock = &xStart; + pxBlock = xStart.pxNextFreeBlock; + while( ( pxBlock->xBlockSize < xWantedSize ) && ( pxBlock->pxNextFreeBlock != NULL ) ) + { + pxPreviousBlock = pxBlock; + pxBlock = pxBlock->pxNextFreeBlock; + } + + /* If the end marker was reached then a block of adequate size + was not found. */ + if( pxBlock != pxEnd ) + { + /* Return the memory space pointed to - jumping over the + BlockLink_t structure at its start. */ + pvReturn = ( void * ) ( ( ( uint8_t * ) pxPreviousBlock->pxNextFreeBlock ) + xHeapStructSize ); + + /* This block is being returned for use so must be taken out + of the list of free blocks. */ + pxPreviousBlock->pxNextFreeBlock = pxBlock->pxNextFreeBlock; + + /* If the block is larger than required it can be split into + two. */ + if( ( pxBlock->xBlockSize - xWantedSize ) > heapMINIMUM_BLOCK_SIZE ) + { + /* This block is to be split into two. Create a new + block following the number of bytes requested. The void + cast is used to prevent byte alignment warnings from the + compiler. */ + pxNewBlockLink = ( void * ) ( ( ( uint8_t * ) pxBlock ) + xWantedSize ); + configASSERT( ( ( ( size_t ) pxNewBlockLink ) & portBYTE_ALIGNMENT_MASK ) == 0 ); + + /* Calculate the sizes of two blocks split from the + single block. */ + pxNewBlockLink->xBlockSize = pxBlock->xBlockSize - xWantedSize; + pxBlock->xBlockSize = xWantedSize; + + /* Insert the new block into the list of free blocks. */ + prvInsertBlockIntoFreeList( pxNewBlockLink ); + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + + xFreeBytesRemaining -= pxBlock->xBlockSize; + + if( xFreeBytesRemaining < xMinimumEverFreeBytesRemaining ) + { + xMinimumEverFreeBytesRemaining = xFreeBytesRemaining; + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + + /* The block is being returned - it is allocated and owned + by the application and has no "next" block. */ + pxBlock->xBlockSize |= xBlockAllocatedBit; + pxBlock->pxNextFreeBlock = NULL; + xNumberOfSuccessfulAllocations++; + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + + traceMALLOC( pvReturn, xWantedSize ); + } + ( void ) xTaskResumeAll(); + + #if( configUSE_MALLOC_FAILED_HOOK == 1 ) + { + if( pvReturn == NULL ) + { + extern void vApplicationMallocFailedHook( void ); + vApplicationMallocFailedHook(); + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + } + #endif + + configASSERT( ( ( ( size_t ) pvReturn ) & ( size_t ) portBYTE_ALIGNMENT_MASK ) == 0 ); + return pvReturn; +} +/*-----------------------------------------------------------*/ + +void vPortFree( void *pv ) +{ +uint8_t *puc = ( uint8_t * ) pv; +BlockLink_t *pxLink; + + if( pv != NULL ) + { + /* The memory being freed will have an BlockLink_t structure immediately + before it. */ + puc -= xHeapStructSize; + + /* This casting is to keep the compiler from issuing warnings. */ + pxLink = ( void * ) puc; + + /* Check the block is actually allocated. */ + configASSERT( ( pxLink->xBlockSize & xBlockAllocatedBit ) != 0 ); + configASSERT( pxLink->pxNextFreeBlock == NULL ); + + if( ( pxLink->xBlockSize & xBlockAllocatedBit ) != 0 ) + { + if( pxLink->pxNextFreeBlock == NULL ) + { + /* The block is being returned to the heap - it is no longer + allocated. */ + pxLink->xBlockSize &= ~xBlockAllocatedBit; + + vTaskSuspendAll(); + { + /* Add this block to the list of free blocks. */ + xFreeBytesRemaining += pxLink->xBlockSize; + traceFREE( pv, pxLink->xBlockSize ); + prvInsertBlockIntoFreeList( ( ( BlockLink_t * ) pxLink ) ); + xNumberOfSuccessfulFrees++; + } + ( void ) xTaskResumeAll(); + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + } +} +/*-----------------------------------------------------------*/ + +size_t xPortGetFreeHeapSize( void ) +{ + return xFreeBytesRemaining; +} +/*-----------------------------------------------------------*/ + +size_t xPortGetMinimumEverFreeHeapSize( void ) +{ + return xMinimumEverFreeBytesRemaining; +} +/*-----------------------------------------------------------*/ + +void vPortInitialiseBlocks( void ) +{ + /* This just exists to keep the linker quiet. */ +} +/*-----------------------------------------------------------*/ + +static void prvHeapInit( void ) +{ +BlockLink_t *pxFirstFreeBlock; +uint8_t *pucAlignedHeap; +size_t uxAddress; +size_t xTotalHeapSize = configTOTAL_HEAP_SIZE; + + /* Ensure the heap starts on a correctly aligned boundary. */ + uxAddress = ( size_t ) ucHeap; + + if( ( uxAddress & portBYTE_ALIGNMENT_MASK ) != 0 ) + { + uxAddress += ( portBYTE_ALIGNMENT - 1 ); + uxAddress &= ~( ( size_t ) portBYTE_ALIGNMENT_MASK ); + xTotalHeapSize -= uxAddress - ( size_t ) ucHeap; + } + + pucAlignedHeap = ( uint8_t * ) uxAddress; + + /* xStart is used to hold a pointer to the first item in the list of free + blocks. The void cast is used to prevent compiler warnings. */ + xStart.pxNextFreeBlock = ( void * ) pucAlignedHeap; + xStart.xBlockSize = ( size_t ) 0; + + /* pxEnd is used to mark the end of the list of free blocks and is inserted + at the end of the heap space. */ + uxAddress = ( ( size_t ) pucAlignedHeap ) + xTotalHeapSize; + uxAddress -= xHeapStructSize; + uxAddress &= ~( ( size_t ) portBYTE_ALIGNMENT_MASK ); + pxEnd = ( void * ) uxAddress; + pxEnd->xBlockSize = 0; + pxEnd->pxNextFreeBlock = NULL; + + /* To start with there is a single free block that is sized to take up the + entire heap space, minus the space taken by pxEnd. */ + pxFirstFreeBlock = ( void * ) pucAlignedHeap; + pxFirstFreeBlock->xBlockSize = uxAddress - ( size_t ) pxFirstFreeBlock; + pxFirstFreeBlock->pxNextFreeBlock = pxEnd; + + /* Only one block exists - and it covers the entire usable heap space. */ + xMinimumEverFreeBytesRemaining = pxFirstFreeBlock->xBlockSize; + xFreeBytesRemaining = pxFirstFreeBlock->xBlockSize; + + /* Work out the position of the top bit in a size_t variable. */ + xBlockAllocatedBit = ( ( size_t ) 1 ) << ( ( sizeof( size_t ) * heapBITS_PER_BYTE ) - 1 ); +} +/*-----------------------------------------------------------*/ + +static void prvInsertBlockIntoFreeList( BlockLink_t *pxBlockToInsert ) +{ +BlockLink_t *pxIterator; +uint8_t *puc; + + /* Iterate through the list until a block is found that has a higher address + than the block being inserted. */ + for( pxIterator = &xStart; pxIterator->pxNextFreeBlock < pxBlockToInsert; pxIterator = pxIterator->pxNextFreeBlock ) + { + /* Nothing to do here, just iterate to the right position. */ + } + + /* Do the block being inserted, and the block it is being inserted after + make a contiguous block of memory? */ + puc = ( uint8_t * ) pxIterator; + if( ( puc + pxIterator->xBlockSize ) == ( uint8_t * ) pxBlockToInsert ) + { + pxIterator->xBlockSize += pxBlockToInsert->xBlockSize; + pxBlockToInsert = pxIterator; + } + else + { + mtCOVERAGE_TEST_MARKER(); + } + + /* Do the block being inserted, and the block it is being inserted before + make a contiguous block of memory? */ + puc = ( uint8_t * ) pxBlockToInsert; + if( ( puc + pxBlockToInsert->xBlockSize ) == ( uint8_t * ) pxIterator->pxNextFreeBlock ) + { + if( pxIterator->pxNextFreeBlock != pxEnd ) + { + /* Form one big block from the two blocks. */ + pxBlockToInsert->xBlockSize += pxIterator->pxNextFreeBlock->xBlockSize; + pxBlockToInsert->pxNextFreeBlock = pxIterator->pxNextFreeBlock->pxNextFreeBlock; + } + else + { + pxBlockToInsert->pxNextFreeBlock = pxEnd; + } + } + else + { + pxBlockToInsert->pxNextFreeBlock = pxIterator->pxNextFreeBlock; + } + + /* If the block being inserted plugged a gab, so was merged with the block + before and the block after, then it's pxNextFreeBlock pointer will have + already been set, and should not be set here as that would make it point + to itself. */ + if( pxIterator != pxBlockToInsert ) + { + pxIterator->pxNextFreeBlock = pxBlockToInsert; + } + else + { + mtCOVERAGE_TEST_MARKER(); + } +} +/*-----------------------------------------------------------*/ + +void vPortGetHeapStats( HeapStats_t *pxHeapStats ) +{ +BlockLink_t *pxBlock; +size_t xBlocks = 0, xMaxSize = 0, xMinSize = portMAX_DELAY; /* portMAX_DELAY used as a portable way of getting the maximum value. */ + + vTaskSuspendAll(); + { + pxBlock = xStart.pxNextFreeBlock; + + /* pxBlock will be NULL if the heap has not been initialised. The heap + is initialised automatically when the first allocation is made. */ + if( pxBlock != NULL ) + { + do + { + /* Increment the number of blocks and record the largest block seen + so far. */ + xBlocks++; + + if( pxBlock->xBlockSize > xMaxSize ) + { + xMaxSize = pxBlock->xBlockSize; + } + + if( pxBlock->xBlockSize < xMinSize ) + { + xMinSize = pxBlock->xBlockSize; + } + + /* Move to the next block in the chain until the last block is + reached. */ + pxBlock = pxBlock->pxNextFreeBlock; + } while( pxBlock != pxEnd ); + } + } + ( void ) xTaskResumeAll(); + + pxHeapStats->xSizeOfLargestFreeBlockInBytes = xMaxSize; + pxHeapStats->xSizeOfSmallestFreeBlockInBytes = xMinSize; + pxHeapStats->xNumberOfFreeBlocks = xBlocks; + + taskENTER_CRITICAL(); + { + pxHeapStats->xAvailableHeapSpaceInBytes = xFreeBytesRemaining; + pxHeapStats->xNumberOfSuccessfulAllocations = xNumberOfSuccessfulAllocations; + pxHeapStats->xNumberOfSuccessfulFrees = xNumberOfSuccessfulFrees; + pxHeapStats->xMinimumEverFreeBytesRemaining = xMinimumEverFreeBytesRemaining; + } + taskEXIT_CRITICAL(); +} + diff --git a/src/master_posix.c b/src/master_posix.c new file mode 100644 index 0000000..355960d --- /dev/null +++ b/src/master_posix.c @@ -0,0 +1,315 @@ + +/* + * TODO Do conditional includes based on which target we are building for. + * This should be specified in the master config file + * TODO This file should be moved to the posix port folder +*/ + +/* FreeRTOS includes. */ +/* #include "FreeRTOS_POSIX.h" */ + +/* System headers. */ +#include +#include +#include + +/* Demo includes. */ +/* #include "posix_demo.h" */ + +/* FreeRTOS+POSIX. */ +/* #include "FreeRTOS_POSIX/pthread.h" */ +/* #include "FreeRTOS_POSIX/mqueue.h" */ +/* #include "FreeRTOS_POSIX/time.h" */ +/* #include "FreeRTOS_POSIX/fcntl.h" */ +/* #include "FreeRTOS_POSIX/errno.h" */ + +#include +#include +#include +#include +/* Constants. */ +#define LINE_BREAK "\r\n" + +/** + * @brief Control messages. + * + * uint8_t is sufficient for this enum, that we are going to cast to char directly. + * If ever needed, implement a function to properly typecast. + */ +/**@{ */ +typedef enum ControlMessage +{ + eMSG_LOWER_INAVLID = 0x00, /**< Guard, let's not use 0x00 for messages. */ + eWORKER_CTRL_MSG_CONTINUE = 0x01, /**< Dispatcher to worker, distributing another job. */ + eWORKER_CTRL_MSG_EXIT = 0x02, /**< Dispatcher to worker, all jobs are finished and the worker receiving such can exit. */ + + /* define additional messages here */ + + eMSG_UPPER_INVALID = 0xFF /**< Guard, additional tasks shall be defined above. */ +} eControlMessage; +/**@} */ + +/** + * @defgroup Configuration constants for the dispatcher-worker demo. + */ +/**@{ */ +#define MQUEUE_NUMBER_OF_WORKERS ( 2 ) /**< The number of worker threads, each thread has one queue which is used as income box. */ + +#if ( MQUEUE_NUMBER_OF_WORKERS > 10 ) +#error "Please keep MQUEUE_NUMBER_OF_WORKERS < 10." +#endif + +#define MQUEUE_WORKER_QNAME_BASE "/qNode0" /**< Queue name base. */ +#define MQUEUE_WORKER_QNAME_BASE_LEN ( 6 ) /** Queue name base length. */ + +#define MQUEUE_TIMEOUT_SECONDS ( 1 ) /**< Relative timeout for mqueue functions. */ +#define MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER ( 1 ) /**< Maximum number of messages in a queue. */ + +#define MQUEUE_MSG_WORKER_CTRL_MSG_SIZE sizeof( uint8_t ) /**< Control message size. */ +#define DEMO_ERROR ( -1 ) /**< Any non-zero value would work. */ +/**@} */ + +/** + * @brief Structure used by Worker thread. + */ +/**@{ */ +typedef struct WorkerThreadResources +{ + pthread_t pxID; /**< thread ID. */ + mqd_t xInboxID; /**< mqueue inbox ID. */ +} WorkerThreadResources_t; +/**@} */ + +/** + * @brief Structure used by Dispatcher thread. + */ +/**@{ */ +typedef struct DispatcherThreadResources +{ + pthread_t pxID; /**< thread ID. */ + mqd_t * pOutboxID; /**< a list of mqueue outbox ID. */ +} DispatcherThreadResources_t; +/**@} */ + +/*-----------------------------------------------------------*/ + +static void * prvWorkerThread( void * pvArgs ) +{ + WorkerThreadResources_t pArgList = *( WorkerThreadResources_t * ) pvArgs; + + printf( "Worker thread #[%d] - start %s", ( int ) pArgList.pxID, LINE_BREAK ); + + struct timespec xReceiveTimeout = { 0 }; + + ssize_t xMessageSize = 0; + char pcReceiveBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 }; + + /* This is a worker thread that reacts based on what is sent to its inbox (mqueue). */ + while( true ) + { + clock_gettime( CLOCK_REALTIME, &xReceiveTimeout ); + xReceiveTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS; + + xMessageSize = mq_receive( pArgList.xInboxID, + pcReceiveBuffer, + MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, + 0 ); + + /* Parse messages */ + if( xMessageSize == MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ) + { + switch( ( int ) pcReceiveBuffer[ 0 ] ) + { + case eWORKER_CTRL_MSG_CONTINUE: + /* Task branch, currently only prints message to screen. */ + /* Could perform tasks here. Could also notify dispatcher upon completion, if desired. */ + printf( "Worker thread #[%d] -- Received eWORKER_CTRL_MSG_CONTINUE %s", ( int ) pArgList.pxID, LINE_BREAK ); + break; + + case eWORKER_CTRL_MSG_EXIT: + printf( "Worker thread #[%d] -- Finished. Exit now. %s", ( int ) pArgList.pxID, LINE_BREAK ); + + return NULL; + + default: + /* Received a message that we don't care or not defined. */ + break; + } + } + else + { + /* Invalid message. Error handling can be done here, if desired. */ + } + } + + /* You should never hit here. */ + /* return NULL; */ +} + +/*-----------------------------------------------------------*/ + +static void * prvDispatcherThread( void * pvArgs ) +{ + DispatcherThreadResources_t pArgList = *( DispatcherThreadResources_t * ) pvArgs; + + printf( "Dispatcher thread - start %s", LINE_BREAK ); + + struct timespec xSendTimeout = { 0 }; + + ssize_t xMessageSize = 0; + char pcSendBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 }; + + /* Just for fun, let threads do a total of 100 independent tasks. */ + int i = 0; + const int totalNumOfJobsPerThread = 100; + + /* Distribute 1000 independent tasks to workers, in round-robin fashion. */ + pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_CONTINUE; + + for( i = 0; i < totalNumOfJobsPerThread; i++ ) + { + clock_gettime( CLOCK_REALTIME, &xSendTimeout ); + xSendTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS; + + printf( "Dispatcher iteration #[%d] -- Sending msg to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK ); + + xMessageSize = mq_timedsend( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], + pcSendBuffer, + MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, + 0, + &xSendTimeout ); + + if( xMessageSize != 0 ) + { + /* This error is acceptable in our setup. + * Since inbox for each thread fits only one message. + * In reality, balance inbox size, message arrival rate, and message drop rate. */ + printf( "An acceptable failure -- dispatcher failed to send eWORKER_CTRL_MSG_CONTINUE to outbox ID: %x. errno %d %s", + ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], errno, LINE_BREAK ); + } + } + + /* Control thread is now done with distributing jobs. Tell workers they are done. */ + pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_EXIT; + + for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) + { + printf( "Dispatcher [%d] -- Sending eWORKER_CTRL_MSG_EXIT to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK ); + + /* This is a blocking call, to guarantee worker thread exits. */ + xMessageSize = mq_send( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], + pcSendBuffer, + MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, + 0 ); + } + + return NULL; +} + +/*-----------------------------------------------------------*/ + +/** + * @brief Job distribution with actor model. + * + * See the top of this file for detailed description. + */ +void vStartPOSIXMaster() +{ + int i = 0; + int iStatus = 0; + + /* Remove warnings about unused parameters. */ + + /* Handles of the threads and related resources. */ + DispatcherThreadResources_t pxDispatcher = { 0 }; + WorkerThreadResources_t pxWorkers[ MQUEUE_NUMBER_OF_WORKERS ] = { { 0 } }; + mqd_t workerMqueues[ MQUEUE_NUMBER_OF_WORKERS ] = { 0 }; + + struct mq_attr xQueueAttributesWorker = + { + .mq_flags = 0, + .mq_maxmsg = MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER, + .mq_msgsize = MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, + .mq_curmsgs = 0 + }; + + pxDispatcher.pOutboxID = workerMqueues; + + /* Create message queues for each worker thread. */ + for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) + { + /* Prepare a unique queue name for each worker. */ + char qName[] = MQUEUE_WORKER_QNAME_BASE; + qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i; + + /* Open a queue with -- + * O_CREAT -- create a message queue. + * O_RDWR -- both receiving and sending messages. + */ + pxWorkers[ i ].xInboxID = mq_open( qName, + O_CREAT | O_RDWR, + ( mode_t ) 0, + &xQueueAttributesWorker ); + + if( pxWorkers[ i ].xInboxID == ( mqd_t ) -1 ) + { + printf( "Invalid inbox (mqueue) for worker. %s", LINE_BREAK ); + iStatus = DEMO_ERROR; + break; + } + + /* Outboxes of dispatcher thread is the inboxes of all worker threads. */ + pxDispatcher.pOutboxID[ i ] = pxWorkers[ i ].xInboxID; + } + + /* Create and start Worker threads. */ + if( iStatus == 0 ) + { + for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) + { + ( void ) pthread_create( &( pxWorkers[ i ].pxID ), NULL, prvWorkerThread, &pxWorkers[ i ] ); + } + + /* Create and start dispatcher thread. */ + ( void ) pthread_create( &( pxDispatcher.pxID ), NULL, prvDispatcherThread, &pxDispatcher ); + + /* Actors will do predefined tasks in threads. Current implementation is that + * dispatcher actor notifies worker actors to terminate upon finishing distributing tasks. */ + + /* Wait for worker threads to join. */ + for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) + { + ( void ) pthread_join( pxWorkers[ i ].pxID, NULL ); + } + + /* Wait for dispatcher thread to join. */ + ( void ) pthread_join( pxDispatcher.pxID, NULL ); + } + + /* Close and unlink worker message queues. */ + for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) + { + char qName[] = MQUEUE_WORKER_QNAME_BASE; + qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i; + + if( pxWorkers[ i ].xInboxID != NULL ) + { + ( void ) mq_close( pxWorkers[ i ].xInboxID ); + ( void ) mq_unlink( qName ); + } + } + + /* Have something on console. */ + if( iStatus == 0 ) + { + printf( "All threads finished. %s", LINE_BREAK ); + } + else + { + printf( "Queues did not get initialized properly. Did not run demo. %s", LINE_BREAK ); + } + + /* This task was created with the native xTaskCreate() API function, so + must not run off the end of its implementing thread. */ + /* vTaskDelete( NULL ); */ +} diff --git a/src/master_rtos.c b/src/master_rtos.c new file mode 100644 index 0000000..4ca3461 --- /dev/null +++ b/src/master_rtos.c @@ -0,0 +1,147 @@ +/* FreeRTOS includes. */ +#include "FreeRTOS.h" +#include "task.h" + +/* System headers */ +#include + +/* Master include */ +#include "master_posix.h" + +/* Master task priority */ +#define mainPOSIX_MASTER_PRIORITY (tskIDLE_PRIORITY + 4) + +int main(void) +{ + configASSERT((mainPOSIX_MASTER_PRIORITY < configMAX_PRIORITIES)); + + /* const uint32_t ulLongTime_ms = pdMS_TO_TICKS(1000UL); */ + + /* Start the task to run POSIX Master */ + xTaskCreate(vStartPOSIXMaster, + "posix", + configMINIMAL_STACK_SIZE, + NULL, + mainPOSIX_MASTER_PRIORITY, + NULL); + + vTaskStartScheduler(); + + /* If all is well, the scheduler will now be running, and the following + * line will never be reached. If the following line does execute, then + * there was insufficient FreeRTOS heap memory available for the idle and/or + * timer tasks to be created. + */ + for(; ;) + { + /* Sleep(ulLongTime_ms); */ + } + + return 0; +} + + +/*-----------------------------------------------------------*/ + +void vAssertCalled(const char * pcFile, + uint32_t ulLine) +{ + /* const uint32_t ulLongSleep = 1000UL; */ + volatile uint32_t ulBlockVariable = 0UL; + volatile char * pcFileName = ( volatile char * ) pcFile; + volatile uint32_t ulLineNumber = ulLine; + + (void) pcFileName; + (void) ulLineNumber; + + printf("vAssertCalled %s, %ld\n", pcFile, (long) ulLine); + fflush(stdout); + + /* Setting ulBlockVariable to a non-zero value in the debugger will allow + * this function to be exited. */ + taskDISABLE_INTERRUPTS(); + { + while(ulBlockVariable == 0UL) + { + /* Sleep(ulLongSleep); */ + } + } + taskENABLE_INTERRUPTS(); +} + +/*-----------------------------------------------------------*/ + +/* configUSE_STATIC_ALLOCATION is set to 1, so the application must provide an + * implementation of vApplicationGetIdleTaskMemory() to provide the memory that is + * used by the Idle task. */ +void vApplicationGetIdleTaskMemory(StaticTask_t ** ppxIdleTaskTCBBuffer, + StackType_t ** ppxIdleTaskStackBuffer, + uint32_t * pulIdleTaskStackSize) +{ + /* If the buffers to be provided to the Idle task are declared inside this + * function then they must be declared static - otherwise they will be allocated on + * the stack and so not exists after this function exits. */ + static StaticTask_t xIdleTaskTCB; + static StackType_t uxIdleTaskStack[ configMINIMAL_STACK_SIZE ]; + + /* Pass out a pointer to the StaticTask_t structure in which the Idle + * task's state will be stored. */ + *ppxIdleTaskTCBBuffer = &xIdleTaskTCB; + + /* Pass out the array that will be used as the Idle task's stack. */ + *ppxIdleTaskStackBuffer = uxIdleTaskStack; + + /* Pass out the size of the array pointed to by *ppxIdleTaskStackBuffer. + * Note that, as the array is necessarily of type StackType_t, + * configMINIMAL_STACK_SIZE is specified in words, not bytes. */ + *pulIdleTaskStackSize = configMINIMAL_STACK_SIZE; +} + +/*-----------------------------------------------------------*/ + +/* configUSE_STATIC_ALLOCATION and configUSE_TIMERS are both set to 1, so the + * application must provide an implementation of vApplicationGetTimerTaskMemory() + * to provide the memory that is used by the Timer service task. */ +void vApplicationGetTimerTaskMemory(StaticTask_t ** ppxTimerTaskTCBBuffer, + StackType_t ** ppxTimerTaskStackBuffer, + uint32_t * pulTimerTaskStackSize) +{ + /* If the buffers to be provided to the Timer task are declared inside this + * function then they must be declared static - otherwise they will be allocated on + * the stack and so not exists after this function exits. */ + static StaticTask_t xTimerTaskTCB; + static StackType_t uxTimerTaskStack[ configTIMER_TASK_STACK_DEPTH ]; + + /* Pass out a pointer to the StaticTask_t structure in which the Timer + * task's state will be stored. */ + *ppxTimerTaskTCBBuffer = &xTimerTaskTCB; + + /* Pass out the array that will be used as the Timer task's stack. */ + *ppxTimerTaskStackBuffer = uxTimerTaskStack; + + /* Pass out the size of the array pointed to by *ppxTimerTaskStackBuffer. + * Note that, as the array is necessarily of type StackType_t, + * configTIMER_TASK_STACK_DEPTH is specified in words, not bytes. */ + *pulTimerTaskStackSize = configTIMER_TASK_STACK_DEPTH; +} + +/*-----------------------------------------------------------*/ + +/** + * @brief Warn user if pvPortMalloc fails. + * + * Called if a call to pvPortMalloc() fails because there is insufficient + * free memory available in the FreeRTOS heap. pvPortMalloc() is called + * internally by FreeRTOS API functions that create tasks, queues, software + * timers, and semaphores. The size of the FreeRTOS heap is set by the + * configTOTAL_HEAP_SIZE configuration constant in FreeRTOSConfig.h. + * + */ +void vApplicationMallocFailedHook() +{ + taskDISABLE_INTERRUPTS(); + + for(;;) + { + } +} diff --git a/src/stream_i2c.c b/src/stream_i2c.c deleted file mode 100644 index b149756..0000000 --- a/src/stream_i2c.c +++ /dev/null @@ -1,37 +0,0 @@ -#include "stream_i2c.h" -#include "port.h" -#include "stream.h" - -int i2c_read(uint8_t* buf, size_t count, void **vptr, void *sptr) -{ - p_stream_t *stream = (p_stream_t*)sptr; - I2C_HandleTypeDef dev = *(I2C_HandleTypeDef*)stream->props[DEVICE]; - - uint16_t addr = *(uint16_t*)vptr[0]; - uint32_t timeout = *(uint32_t*)vptr[1]; - uint16_t AF_limit = *(uint32_t*)vptr[2]; - - int error, AF_counter = 0; - while (HAL_I2C_Master_Receive(&dev, addr, buf, count, timeout) != HAL_OK) { - if ((error = HAL_I2C_GetError(&dev)) != HAL_I2C_ERROR_AF) { - return error; - } - else if (++AF_counter > AF_limit) { - return HAL_I2C_ERROR_AF; - } - } - return 0; -} - -int i2c_write(uint8_t* buf, size_t count, void **vptr, void *sptr) -{ - p_stream_t *stream = (p_stream_t*)sptr; - I2C_HandleTypeDef dev = *(I2C_HandleTypeDef*)stream->props[DEVICE]; - uint16_t addr = *(uint16_t*)vptr[1]; - uint32_t timeout = *(uint32_t*)vptr[2]; - - while (HAL_I2C_Master_Transmit(&dev, addr, buf, count, timeout) != HAL_OK) { - return HAL_I2C_GetError(&dev); - } - return 0; -} diff --git a/src/stream_stdio.c b/src/stream_stdio.c deleted file mode 100644 index efa4b13..0000000 --- a/src/stream_stdio.c +++ /dev/null @@ -1,18 +0,0 @@ -#include -#include "stream_stdio.h" -#include "stream.h" - -int stdio_read(uint8_t* buf, size_t count, void **vptr, void *sptr) -{ - int x; - for (x = 0; x < count; x++) { - scanf("%c", &buf[x]); - } - return 0; -} - -int stdio_write(uint8_t* buf, size_t count, void **vptr, void *sptr) -{ - printf("%s\n", buf); - return 0; -} -- cgit v1.2.3