/* * TODO Do conditional includes based on which target we are building for. * Actually, target-specific includes should go in the port folder * This should be specified in the master config file * TODO This file should be moved to the posix port folder */ /* FreeRTOS includes. */ /* #include "FreeRTOS_POSIX.h" */ /* System headers. */ #include #include #include /* Library includes */ #include #include /* Project includes. */ #include "master_posix.h" #include "main.h" #include "devices.h" #include "config.h" #include "dataflow.h" #include "handshake.pb.h" #include "data.pb.h" #include "stream.h" #include "stream_stdio.h" /* FreeRTOS+POSIX. should go in the port folder */ /* #include "FreeRTOS_POSIX/pthread.h" */ /* #include "FreeRTOS_POSIX/mqueue.h" */ /* #include "FreeRTOS_POSIX/time.h" */ /* #include "FreeRTOS_POSIX/fcntl.h" */ /* #include "FreeRTOS_POSIX/errno.h" */ /* Constants. */ #define LINE_BREAK "\r\n" #define device_MDR s2m_MDR_response #define GET_IDX_FROM_ADDR(i2c_addr) (i2c_addr>>1)-1 #define GET_ADDR_FROM_IDX(idx) (idx+1)<<1 #define GET_BIT_FROM_IDX(a, b) a[b>>5]&(1<<(b%32)) #define SET_BIT_FROM_IDX(a, b) a[b>>5]|=(1<<(b%32)) #define COUNTOF(__BUFFER__) (sizeof(__BUFFER__) / sizeof(*(__BUFFER__))) #define BUS_DEVICE_LIMIT 16 device_info_t *device_info[BUS_DEVICE_LIMIT] = {NULL}; subscription_info_t* subs_info[BUS_DEVICE_LIMIT]; uint32_t allocated[4]={0}; uint8_t dev_sts[BUS_DEVICE_LIMIT] = {OFFLINE}; uint8_t data_idx; uint8_t *data_rbuf[ROUTING_BUFSIZE]; /*< Buffer to store data to be routed */ uint8_t *cmd_routing_buf[ROUTING_BUFSIZE]; /*< Buffer to store commands to be routed */ uint8_t data_src_idx_rbuf[ROUTING_BUFSIZE]; /*< Index information for data source */ uint8_t cmd_src_idx_rbuf[ROUTING_BUFSIZE]; /*< Index information for command source */ uint8_t cmd_dst_idx_rbuf[ROUTING_BUFSIZE]; /*< Index information for command dest */ uint32_t data_len_buf[ROUTING_BUFSIZE]; uint32_t data_routing_ptr = 0; /*< Pointer to tail of both data and data index buffers */ uint32_t cmd_routing_ptr = 0; /*< Pointer to tail of cmd and cmd index buffers */ p_stream_t stream = {&stdio_read, &stdio_write, NULL}; static void *handshake_func(void * pvArgs); hs_status_t handshake(uint32_t i2c_addr); dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, uint8_t routing_buf_idx); bool routing(void); bool cmd_routing(void); uint8_t get_CTS(uint8_t i2c_addr); bool todo_hs_or_not_todo_hs(uint8_t i2c_addr); state_t get_state_from_hs_status(uint16_t device_addr, hs_status_t hs_status); bool decode_subscriptions_callback(pb_istream_t *istream, const pb_field_t *field, void **args); bool encode_subscription_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg); bool encode_datapoint_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg); bool decode_data_callback(pb_istream_t *istream, const pb_field_t *field, void **args); bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg); /** * @brief Control messages. * * uint8_t is sufficient for this enum, that we are going to cast to char directly. * If ever needed, implement a function to properly typecast. */ /**@{ */ typedef enum ControlMessage { eMSG_LOWER_INAVLID = 0x00, /**< Guard, let's not use 0x00 for messages. */ eWORKER_CTRL_MSG_CONTINUE = 0x01, /**< Dispatcher to worker, distributing another job. */ eWORKER_CTRL_MSG_EXIT = 0x02, /**< Dispatcher to worker, all jobs are finished and the worker receiving such can exit. */ /* define additional messages here */ eMSG_UPPER_INVALID = 0xFF /**< Guard, additional tasks shall be defined above. */ } eControlMessage; /**@} */ /** * @defgroup Configuration constants for the dispatcher-worker demo. */ /**@{ */ #define MQUEUE_NUMBER_OF_WORKERS ( 2 ) /**< The number of worker threads, each thread has one queue which is used as income box. */ #if ( MQUEUE_NUMBER_OF_WORKERS > 10 ) #error "Please keep MQUEUE_NUMBER_OF_WORKERS < 10." #endif #define MQUEUE_WORKER_QNAME_BASE "/qNode0" /**< Queue name base. */ #define MQUEUE_WORKER_QNAME_BASE_LEN ( 6 ) /** Queue name base length. */ #define MQUEUE_TIMEOUT_SECONDS ( 1 ) /**< Relative timeout for mqueue functions. */ #define MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER ( 1 ) /**< Maximum number of messages in a queue. */ #define MQUEUE_MSG_WORKER_CTRL_MSG_SIZE sizeof( uint8_t ) /**< Control message size. */ #define DEMO_ERROR ( -1 ) /**< Any non-zero value would work. */ /**@} */ /** * @brief Structure used by Worker thread. */ /**@{ */ typedef struct WorkerThreadResources { pthread_t pxID; /**< thread ID. */ mqd_t xInboxID; /**< mqueue inbox ID. */ } WorkerThreadResources_t; /**@} */ /** * @brief Structure used by Dispatcher thread. */ /**@{ */ typedef struct DispatcherThreadResources { pthread_t pxID; /**< thread ID. */ mqd_t * pOutboxID; /**< a list of mqueue outbox ID. */ } DispatcherThreadResources_t; /**@} */ /*-----------------------------------------------------------*/ static void * prvWorkerThread( void * pvArgs ) { WorkerThreadResources_t pArgList = *( WorkerThreadResources_t * ) pvArgs; printf( "Worker thread #[%d] - start %s", ( int ) pArgList.pxID, LINE_BREAK ); struct timespec xReceiveTimeout = { 0 }; ssize_t xMessageSize = 0; char pcReceiveBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 }; /* This is a worker thread that reacts based on what is sent to its inbox (mqueue). */ while( true ) { clock_gettime( CLOCK_REALTIME, &xReceiveTimeout ); xReceiveTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS; xMessageSize = mq_receive( pArgList.xInboxID, pcReceiveBuffer, MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, 0 ); /* Parse messages */ if( xMessageSize == MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ) { switch( ( int ) pcReceiveBuffer[ 0 ] ) { case eWORKER_CTRL_MSG_CONTINUE: /* Task branch, currently only prints message to screen. */ /* Could perform tasks here. Could also notify dispatcher upon completion, if desired. */ printf( "Worker thread #[%d] -- Received eWORKER_CTRL_MSG_CONTINUE %s", ( int ) pArgList.pxID, LINE_BREAK ); break; case eWORKER_CTRL_MSG_EXIT: printf( "Worker thread #[%d] -- Finished. Exit now. %s", ( int ) pArgList.pxID, LINE_BREAK ); return NULL; default: /* Received a message that we don't care or not defined. */ break; } } else { /* Invalid message. Error handling can be done here, if desired. */ } } /* You should never hit here. */ /* return NULL; */ } /*-----------------------------------------------------------*/ static void * prvDispatcherThread( void * pvArgs ) { DispatcherThreadResources_t pArgList = *( DispatcherThreadResources_t * ) pvArgs; printf( "Dispatcher thread - start %s", LINE_BREAK ); struct timespec xSendTimeout = { 0 }; ssize_t xMessageSize = 0; char pcSendBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 }; /* Just for fun, let threads do a total of 100 independent tasks. */ int i = 0; const int totalNumOfJobsPerThread = 100; /* Distribute 1000 independent tasks to workers, in round-robin fashion. */ pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_CONTINUE; for( i = 0; i < totalNumOfJobsPerThread; i++ ) { clock_gettime( CLOCK_REALTIME, &xSendTimeout ); xSendTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS; printf( "Dispatcher iteration #[%d] -- Sending msg to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK ); xMessageSize = mq_timedsend( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], pcSendBuffer, MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, 0, &xSendTimeout ); if( xMessageSize != 0 ) { /* This error is acceptable in our setup. * Since inbox for each thread fits only one message. * In reality, balance inbox size, message arrival rate, and message drop rate. */ printf( "An acceptable failure -- dispatcher failed to send eWORKER_CTRL_MSG_CONTINUE to outbox ID: %x. errno %d %s", ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], errno, LINE_BREAK ); } } /* Control thread is now done with distributing jobs. Tell workers they are done. */ pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_EXIT; for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) { printf( "Dispatcher [%d] -- Sending eWORKER_CTRL_MSG_EXIT to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK ); /* This is a blocking call, to guarantee worker thread exits. */ xMessageSize = mq_send( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], pcSendBuffer, MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, 0 ); } return NULL; } /*-----------------------------------------------------------*/ /** * @brief Job distribution with actor model. * * See the top of this file for detailed description. */ void vStartPOSIXMaster(void *pvParams) { int i = 0; int iStatus = 0; pthread_t handshake_thread; pthread_create(&handshake_thread, NULL, handshake_func, NULL); /* Remove warnings about unused parameters. */ /* Handles of the threads and related resources. */ DispatcherThreadResources_t pxDispatcher = { 0 }; WorkerThreadResources_t pxWorkers[ MQUEUE_NUMBER_OF_WORKERS ] = { { 0 } }; mqd_t workerMqueues[ MQUEUE_NUMBER_OF_WORKERS ] = { 0 }; struct mq_attr xQueueAttributesWorker = { .mq_flags = 0, .mq_maxmsg = MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER, .mq_msgsize = MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, .mq_curmsgs = 0 }; pxDispatcher.pOutboxID = workerMqueues; /* Create message queues for each worker thread. */ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) { /* Prepare a unique queue name for each worker. */ char qName[] = MQUEUE_WORKER_QNAME_BASE; qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i; /* Open a queue with -- * O_CREAT -- create a message queue. * O_RDWR -- both receiving and sending messages. */ pxWorkers[ i ].xInboxID = mq_open( qName, O_CREAT | O_RDWR, ( mode_t ) 0, &xQueueAttributesWorker ); if( pxWorkers[ i ].xInboxID == ( mqd_t ) -1 ) { printf( "Invalid inbox (mqueue) for worker. %s", LINE_BREAK ); iStatus = DEMO_ERROR; break; } /* Outboxes of dispatcher thread is the inboxes of all worker threads. */ pxDispatcher.pOutboxID[ i ] = pxWorkers[ i ].xInboxID; } /* Create and start Worker threads. */ if( iStatus == 0 ) { for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) { ( void ) pthread_create( &( pxWorkers[ i ].pxID ), NULL, prvWorkerThread, &pxWorkers[ i ] ); } /* Create and start dispatcher thread. */ ( void ) pthread_create( &( pxDispatcher.pxID ), NULL, prvDispatcherThread, &pxDispatcher ); /* Actors will do predefined tasks in threads. Current implementation is that * dispatcher actor notifies worker actors to terminate upon finishing distributing tasks. */ /* Wait for worker threads to join. */ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) { ( void ) pthread_join( pxWorkers[ i ].pxID, NULL ); } /* Wait for dispatcher thread to join. */ ( void ) pthread_join( pxDispatcher.pxID, NULL ); } /* Close and unlink worker message queues. */ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) { char qName[] = MQUEUE_WORKER_QNAME_BASE; qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i; if( pxWorkers[ i ].xInboxID != NULL ) { ( void ) mq_close( pxWorkers[ i ].xInboxID ); ( void ) mq_unlink( qName ); } } /* Have something on console. */ if( iStatus == 0 ) { printf( "All threads finished. %s", LINE_BREAK ); } else { printf( "Queues did not get initialized properly. Did not run demo. %s", LINE_BREAK ); } /* This task was created with the native xTaskCreate() API function, so must not run off the end of its implementing thread. */ /* vTaskDelete( NULL ); */ } static void *handshake_func(void * pvArgs) { hs_status_t hs_status; printf("Handshake thread started %s", LINE_BREAK); for (;;) { for (int dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT-1; dev_idx++) { if (todo_hs_or_not_todo_hs(GET_ADDR_FROM_IDX(dev_idx))) { hs_status = handshake(GET_ADDR_FROM_IDX(dev_idx)); dev_sts[dev_idx] = get_state_from_hs_status(GET_ADDR_FROM_IDX(dev_idx), hs_status); } } } return NULL; } hs_status_t handshake(uint32_t device_id) { /* Handshake variables */ uint8_t hs_sts = IDLE; uint8_t *MDR_buf; uint32_t dev_idx = GET_IDX_FROM_ADDR(i2c_addr); uint16_t MDR_len = 0; void **vptr = malloc(sizeof(uint8_t*)*2); vptr[0] = malloc(sizeof(uint8_t*)); vptr[0] = &i2c_addr; s2m_MDR_response MDR_res_message = s2m_MDR_response_init_default; while (hs_sts != HS_FAILED && hs_sts != HS_REGISTERED) { switch (hs_sts) { case (IDLE): { uint8_t MDR_req_buf[2] = {0x0, 0x1}; if (stream.write(MDR_req_buf, 2, vptr, &stream) != 0) { hs_sts = HS_FAILED; } else { hs_sts = HS_MDR_ACK; } break; } case (HS_MDR_ACK): { uint8_t MDR_ACK_buf[2] = {0x0, 0x0}; if (stream.read(MDR_ACK_buf, 2, vptr, &stream) != 0) { hs_sts = HS_FAILED; } else { uint8_t ACK_flag = MDR_ACK_buf[1]; if (ACK_flag == 0xFF) { MDR_len = MDR_ACK_buf[0]; hs_sts = HS_MDR_CTS; } else { hs_sts = HS_FAILED; } } break; } case (HS_MDR_CTS): { uint8_t MDR_CTS_buf[2] = {0x0, 0x02}; if (stream.write(MDR_CTS_buf, 2, vptr, &stream) != 0) { hs_sts = HS_FAILED; } else { hs_sts = HS_MDR_MDR; } break; } case (HS_MDR_MDR): { MDR_buf = (uint8_t*)malloc(MDR_len); if (stream.read(MDR_buf, MDR_len, vptr, &stream) != 0) { hs_sts = HS_FAILED; } else { MDR_res_message.subscriptions.funcs.decode = decode_subscriptions_callback; MDR_res_message.subscriptions.arg = (void*)dev_idx; pb_istream_t MDR_res_stream = pb_istream_from_buffer(MDR_buf, MDR_len); if (!pb_decode(&MDR_res_stream, s2m_MDR_response_fields, &MDR_res_message)) { hs_sts = HS_FAILED; } else { device_info[dev_idx] = malloc(sizeof(device_info_t)); device_info[dev_idx]->i2c_addr = i2c_addr; device_info[dev_idx]->device_id = dev_idx; device_info[dev_idx]->MDR = MDR_res_message; hs_sts = HS_REGISTERED; } } break; } } } return hs_sts; } bool todo_hs_or_not_todo_hs(uint8_t i2c_addr) { uint8_t device_idx = GET_IDX_FROM_ADDR(i2c_addr); state_t device_curr_state = dev_sts[device_idx]; bool do_hs = false; switch(device_curr_state) { case NO_HS: case CONNECTED: case FAILED: case OFFLINE: do_hs = true; break; case REGISTERED: case NO_DATA: do_hs = false; break; } return do_hs; } state_t get_state_from_hs_status(uint16_t device_addr, hs_status_t hs_status) { state_t device_state = OFFLINE; switch(hs_status) { case IDLE: case HS_FAILED: device_state = OFFLINE; break; case HS_MDR_ACK: case HS_MDR_CTS: case HS_MDR_MDR: device_state = FAILED; break; case HS_REGISTERED: device_state = REGISTERED; break; } return device_state; } bool decode_subscriptions_callback(pb_istream_t *istream, const pb_field_t *field, void **args) { _subscriptions subs; int *subs_idx = (int*)args; /* Check is storage is allocated; if not, allocate it */ if ((GET_BIT_FROM_IDX(allocated, *subs_idx)) == 0) { subs_info[*subs_idx] = (subscription_info_t*)malloc(sizeof(subscription_info_t)); SET_BIT_FROM_IDX(allocated, *subs_idx); subs_info[*subs_idx]->mod_idx = subs_info[*subs_idx]->entity_idx = subs_info[*subs_idx]->class_idx = subs_info[*subs_idx]->i2c_idx = 0; } if(!pb_decode(istream, _subscriptions_fields, &subs)) return false; /* Parse all fields if they're included */ if (subs.has_module_id) subs_info[*subs_idx]->module_ids[subs_info[*subs_idx]->mod_idx++] = subs.module_id; if (subs.has_entity_id) subs_info[*subs_idx]->entity_ids[subs_info[*subs_idx]->entity_idx++] = subs.entity_id; if (subs.has_module_class) subs_info[*subs_idx]->module_class[subs_info[*subs_idx]->class_idx++] = subs.module_class; if (subs.has_i2c_address) subs_info[*subs_idx]->i2c_address[subs_info[*subs_idx]->i2c_idx++] = subs.i2c_address; return true; }