diff options
| author | Aditya Naik | 2020-08-03 18:11:57 -0400 |
|---|---|---|
| committer | Aditya Naik | 2020-08-03 18:11:57 -0400 |
| commit | 5154fdecac5260ea3a2fd6416c1186eb4d609449 (patch) | |
| tree | 91df6c6955a6fb11706920cd5d922d834c170bc4 | |
| parent | 8e1133fcf30fbb4e1d406c47cb87d592c9742c4f (diff) | |
Added all functions into threads; cleaup
| -rw-r--r-- | include/main.h | 49 | ||||
| -rw-r--r-- | include/stream.h | 7 | ||||
| -rw-r--r-- | ports/posix/src/stream_stdio.c | 13 | ||||
| -rw-r--r-- | ports/stm32f4/src/port.h | 3 | ||||
| -rw-r--r-- | ports/stm32f4/src/stream_i2c.c | 2 | ||||
| -rw-r--r-- | src/master.c | 211 | ||||
| -rw-r--r-- | src/master_posix.c | 599 |
7 files changed, 381 insertions, 503 deletions
diff --git a/include/main.h b/include/main.h index 6579484..60fe0a2 100644 --- a/include/main.h +++ b/include/main.h @@ -1,25 +1,11 @@ -/* USER CODE BEGIN Header */
/**
******************************************************************************
* @file : main.h
* @brief : Header for main.c file.
* This file contains the common defines of the application.
******************************************************************************
- * @attention
- *
- * <h2><center>© Copyright (c) 2020 STMicroelectronics.
- * All rights reserved.</center></h2>
- *
- * This software component is licensed by ST under BSD 3-Clause license,
- * the "License"; You may not use this file except in compliance with the
- * License. You may obtain a copy of the License at:
- * opensource.org/licenses/BSD-3-Clause
- *
- ******************************************************************************
*/
-/* USER CODE END Header */
-/* Define to prevent recursive inclusion -------------------------------------*/
#ifndef __MAIN_H
#define __MAIN_H
@@ -27,43 +13,12 @@ extern "C" {
#endif
-/* Includes ------------------------------------------------------------------*/
#include "port.h"
-/* Private includes ----------------------------------------------------------*/
-/* USER CODE BEGIN Includes */
-
-/* USER CODE END Includes */
-
-/* Exported types ------------------------------------------------------------*/
-/* USER CODE BEGIN ET */
-
-/* USER CODE END ET */
-
-/* Exported constants --------------------------------------------------------*/
-/* USER CODE BEGIN EC */
-
-/* USER CODE END EC */
-
-/* Exported macro ------------------------------------------------------------*/
-/* USER CODE BEGIN EM */
-
-/* USER CODE END EM */
-
-/* Exported functions prototypes ---------------------------------------------*/
void Error_Handler(void);
-/* USER CODE BEGIN EFP */
-
-/* USER CODE END EFP */
-
-/* Private defines -----------------------------------------------------------*/
-#define led_Pin GPIO_PIN_13
-#define led_GPIO_Port GPIOC
-/* USER CODE BEGIN Private defines */
-
-/* USER CODE END Private defines */
-
+#define BUS_DEVICE_LIMIT 16
+
#ifdef __cplusplus
}
#endif
diff --git a/include/stream.h b/include/stream.h index 6bdc29e..96d9823 100644 --- a/include/stream.h +++ b/include/stream.h @@ -1,10 +1,11 @@ #include <stdint.h> #include <stddef.h> -typedef struct p_stream_s p_stream_t; +/* typedef struct p_stream_s p_stream_t; */ -struct p_stream_s { +typedef struct { int (*read)(uint8_t *buf, size_t len, void **vptr, void *sptr); int (*write)(uint8_t *buf, size_t len, void **vptr, void *sptr); + int (*init)(void **vptr, void *sptr); void **props; -}; +} p_stream_t; diff --git a/ports/posix/src/stream_stdio.c b/ports/posix/src/stream_stdio.c index ed83e7b..e9b6f09 100644 --- a/ports/posix/src/stream_stdio.c +++ b/ports/posix/src/stream_stdio.c @@ -1,6 +1,19 @@ #include <stdio.h> +#include <stdlib.h> #include "stream_stdio.h" #include "stream.h" +#include "main.h" + +int stdio_init(void **vptr, void *sptr) +{ + /* TODO */ + /* p_stream_t *stream = (p_stream_t*)sptr; */ + /* int num_files = BUS_DEVICE_LIMIT; */ + /* int x; */ + /* char fname_base[] = "devio_", fname[10]; */ + + return 0; +} int stdio_read(uint8_t* buf, size_t count, void **vptr, void *sptr) { diff --git a/ports/stm32f4/src/port.h b/ports/stm32f4/src/port.h index ffbbe9e..cb55268 100644 --- a/ports/stm32f4/src/port.h +++ b/ports/stm32f4/src/port.h @@ -9,4 +9,7 @@ #include "stm32f4xx_hal.h" +#define led_Pin GPIO_PIN_13 +#define led_GPIO_Port GPIOC + #endif diff --git a/ports/stm32f4/src/stream_i2c.c b/ports/stm32f4/src/stream_i2c.c index 3e8fc95..8c6c65b 100644 --- a/ports/stm32f4/src/stream_i2c.c +++ b/ports/stm32f4/src/stream_i2c.c @@ -19,7 +19,7 @@ int i2c_read(uint8_t* buf, size_t count, void **vptr, void *sptr) else if (++AF_counter > AF_limit) { return HAL_I2C_ERROR_AF; } - } + } return 0; } diff --git a/src/master.c b/src/master.c index 99cbea5..969fca5 100644 --- a/src/master.c +++ b/src/master.c @@ -36,7 +36,7 @@ #define COUNTOF(__BUFFER__) (sizeof(__BUFFER__) / sizeof(*(__BUFFER__))) /* #define I2C_ADDRESS 0x05 */ -#define BUS_DEVICE_LIMIT 16 + /* Macro to toggle between master and slave firmware */ #define MASTER @@ -160,7 +160,7 @@ hs_status_t handshake(uint32_t i2c_addr) uint32_t dev_idx = GET_IDX_FROM_ADDR(i2c_addr); uint16_t MDR_len = 0; - void **vptr = malloc(sizeof(uint8_t*)*2); + void **vptr = malloc(sizeof(uint8_t*)); vptr[0] = malloc(sizeof(uint8_t*)); vptr[0] = &i2c_addr; @@ -338,15 +338,10 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile switch (df_status) { case (DF_IDLE): { - HAL_Delay(MASTER_I2C_BUS_INTERVAL); uint8_t SOR_buf[2] = {SOR_code, 0x0}; - if (HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, SOR_buf, 2, 500) != HAL_OK) { + if (stream.write(SOR_buf, 2, vptr, &stream) != 0) { df_status = DF_FAIL; -#ifdef DEBUG_ENABLE - goto __DF_SOR_I2C_ERROR; - __DF_SOR_I2C_ERROR_END: - __asm__("nop"); -#endif + break; } else { if (SOR_code == SLAVE_TX) { @@ -361,29 +356,14 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile } break; } - case (DF_RX_DOC): { - HAL_Delay(MASTER_I2C_BUS_INTERVAL); - AF_error_counter = 0; - while (HAL_I2C_Master_Receive(&hi2c1, (uint16_t)i2c_addr, - (uint8_t*)DOC_buf, 4, 0xffff) != HAL_OK) { - if (HAL_I2C_GetError(&hi2c1) != HAL_I2C_ERROR_AF) { - df_status = DF_FAIL; -#ifdef DEBUG_ENABLE - goto __DF_DOC_I2C_ERROR; - __DF_DOC_I2C_ERROR_END: - __asm__("nop"); -#endif - break; - } - else if (++AF_error_counter > 1500) { - df_status = DF_FAIL; - break; - } + if (stream.read(DOC_buf, 4, vptr, &stream) != 0) { + df_status = DF_FAIL; + break; } - if (df_status != DF_FAIL) { + else { if (DOC_buf[0] == 0x0) { /* Do nothing DOC; should become redundant once dataflow is initiated using classes */ df_status = DF_SUCCESS; @@ -409,16 +389,11 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile } break; } + case (DF_CTS): { - HAL_Delay(MASTER_I2C_BUS_INTERVAL); - if (HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, CTS_buf, 2, 1000) != HAL_OK) { + if (stream.write(CTS_buf, 2, vptr, &stream) != 0) { df_status = DF_FAIL; -#ifdef DEBUG_ENABLE - goto __DF_CTS_I2C_ERROR; - __DF_CTS_I2C_ERROR_END: - __asm__("nop"); -#endif } else { if (DOC_buf[1] == DATA) { @@ -432,42 +407,26 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile } break; } + case (DF_RX_DATA): { - HAL_Delay(MASTER_I2C_BUS_INTERVAL); data_rbuf[data_routing_ptr] = malloc(sizeof(uint8_t)*data_len); - AF_error_counter = 0; - while (HAL_I2C_Master_Receive(&hi2c1, (uint16_t)i2c_addr, - (uint8_t*)data_rbuf[data_routing_ptr], - data_len, 1000) != HAL_OK) { - if (HAL_I2C_GetError(&hi2c1) != HAL_I2C_ERROR_AF) { - df_status = DF_FAIL; -#ifdef DEBUG_ENABLE - goto __DF_DATA_I2C_ERROR; - __DF_DATA_I2C_ERROR_END: - __asm__("nop"); -#endif - break; - } - else if (++AF_error_counter > 1500) { - df_status = DF_FAIL; - break; - } + if (stream.read(data_rbuf[data_routing_ptr], data_len, vptr, &stream) != 0) { + df_status = DF_FAIL; + free(data_rbuf[data_routing_ptr]); + break; } - if (df_status != DF_FAIL) { + else { data_src_idx_rbuf[data_routing_ptr] = dev_idx; data_len_buf[data_routing_ptr] = (uint8_t)data_len; data_routing_ptr++; df_status = DF_SUCCESS; } - else { - free(data_rbuf[data_routing_ptr]); - } break; } + case (DF_TX_DATA): { - HAL_Delay(MASTER_I2C_BUS_INTERVAL); /* TODO error checking */ /* Will need to package datapoint and MDR to know their lengths Once cached, will not need to do this */ @@ -486,66 +445,32 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile uint8_t data_MDR_len_buf[4] = {0, MDR_len, 0, data_len}; uint8_t status = get_CTS(i2c_addr); - HAL_Delay(MASTER_I2C_BUS_INTERVAL); - if (status != 0 && - HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, - data_MDR_len_buf, 4, 10000) == HAL_OK) { -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "MDR len: %d data len: %d SENT\r\n", MDR_len, data_len); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif - } - else { -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Failed to send lengths\r\n"); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif - } + + if (status == 0 || + stream.write(data_MDR_len_buf, 4, vptr, &stream) != 0); + status = get_CTS(i2c_addr); if (status != 0 && - HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, MDR_buf, - MDR_len, 10000) == HAL_OK && - HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, data_rbuf[rbuf_idx], - data_len, 10000) == HAL_OK) { + stream.write(MDR_buf, MDR_len, vptr, &stream) == 0 && + stream.write(data_rbuf[rbuf_idx], data_len) == 0) { df_status = DF_SUCCESS; -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Data and MDR sent\r\n"); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif } else { df_status = DF_FAIL; } break; } + case (DF_RX_CMD): { - uint8_t *cmd_buf; - HAL_Delay(MASTER_I2C_BUS_INTERVAL); + uint8_t *cmd_buf; cmd_buf = (uint8_t*)malloc(data_len); - AF_error_counter = 0; - while (HAL_I2C_Master_Receive(&hi2c1, (uint16_t)i2c_addr, cmd_buf, - data_len, 10000) != HAL_OK) { - if (HAL_I2C_GetError(&hi2c1) != HAL_I2C_ERROR_AF) { - df_status = DF_FAIL; - } - if (++AF_error_counter > 3000) { - df_status = DF_FAIL; - } - if (df_status == DF_FAIL) { - free(cmd_buf); -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Failed to get command, expected len: %ld\r\n", data_len); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif - break; - } + + if (stream.read(cmd_buf, data_len, vptr, &stream) != 0) { + free(cmd_buf); + df_status = DF_FAIL; } - if (df_status != DF_FAIL) { + else { cmd_routing_buf[cmd_routing_ptr] = malloc(sizeof(uint8_t)*data_len); memcpy(cmd_routing_buf[cmd_routing_ptr], cmd_buf, data_len); free(cmd_buf); @@ -553,54 +478,31 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile cmd_src_idx_rbuf[cmd_routing_ptr] = dev_idx; cmd_dst_idx_rbuf[cmd_routing_ptr] = cmd_dest; cmd_routing_ptr++; - -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Got command\r\n"); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif + df_status = DF_SUCCESS; } break; } + case (DF_TX_CMD): { - HAL_Delay(MASTER_I2C_BUS_INTERVAL); uint8_t data_len = sizeof(cmd_routing_buf[rbuf_idx]); uint8_t len_buf[] = {0x0, data_len, 0x0, 0x0}; - uint8_t status = get_CTS(i2c_addr); + uint8_t status = get_CTS(i2c_addr); if (status != 0 && - HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, len_buf, - 4, 10000) != HAL_OK) { + stream.write(len_buf, 4, vptr, &stream) == 0) { df_status = DF_FAIL; -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Failed to send cmd len buf to %d\r\n", i2c_addr>>1); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif } if (df_status != DF_FAIL) { uint8_t status = get_CTS(i2c_addr); if (status != 0 && - HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, - cmd_routing_buf[rbuf_idx], - 4, 10000) == HAL_OK) { + stream.write(cmd_routing_buf[rbuf_idx], vptr, &stream) == 0) { df_status = DF_SUCCESS; -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Routed cmd to %d\r\n", i2c_addr>>1); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif } else { df_status = DF_FAIL; -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Failed to send cmd to %d\r\n", i2c_addr>>1); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); -#endif } } break; @@ -611,43 +513,6 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile } } -#ifdef TESTING_ENABLE - { - goto __DF_TESTING_BLOCK_END; - __DF_TESTING_BLOCK_END: - __asm__("nop"); - } -#endif - -#ifdef DEBUG_ENABLE - { - goto __DF_DEBUG_BLOCK_END; - __DF_SOR_I2C_ERROR: - sprintf((char*)debug_buf, "Unable to send SOR request to %d. I2C error: %ld\r\n", - i2c_addr>>1, HAL_I2C_GetError(&hi2c1)); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); - goto __DF_SOR_I2C_ERROR_END; - __DF_DOC_I2C_ERROR: - sprintf((char*)debug_buf, "Unable to receive DOC. I2C error: %ld\r\n", HAL_I2C_GetError(&hi2c1)); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); - goto __DF_DOC_I2C_ERROR_END; - __DF_CTS_I2C_ERROR: - sprintf((char*)debug_buf, "CTS I2C error: %ld\r\n", HAL_I2C_GetError(&hi2c1)); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); - goto __DF_CTS_I2C_ERROR_END; - __DF_DATA_I2C_ERROR: - sprintf((char*)debug_buf, "Unable to receive data. I2C error: %ld\r\n", HAL_I2C_GetError(&hi2c1)); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); - memset(debug_buf, 0, 128); - goto __DF_DATA_I2C_ERROR_END; - __DF_DEBUG_BLOCK_END: - __asm__("nop"); - } -#endif - return df_status; } @@ -663,12 +528,6 @@ uint8_t get_CTS(uint8_t i2c_addr) #endif uint8_t status = stream.read(CTS_buf, 2, vptr, &stream); - if (status != 0) { -#ifdef DEBUG_ENABLE - sprintf((char*)debug_buf, "Failed to get CTS\r\n"); - HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100); -#endif - } return status; } diff --git a/src/master_posix.c b/src/master_posix.c index 02966bd..83eb741 100644 --- a/src/master_posix.c +++ b/src/master_posix.c @@ -4,7 +4,7 @@ * Actually, target-specific includes should go in the port folder * This should be specified in the master config file * TODO This file should be moved to the posix port folder -*/ + */ /* FreeRTOS includes. */ /* #include "FreeRTOS_POSIX.h" */ @@ -65,9 +65,12 @@ uint32_t data_len_buf[ROUTING_BUFSIZE]; uint32_t data_routing_ptr = 0; /*< Pointer to tail of both data and data index buffers */ uint32_t cmd_routing_ptr = 0; /*< Pointer to tail of cmd and cmd index buffers */ -p_stream_t stream = {&stdio_read, &stdio_write, NULL}; +p_stream_t stream = {&stdio_read, &stdio_write, NULL, NULL}; static void *handshake_func(void * pvArgs); +static void *dataflow_func(void *pvArgs); +static void *routing_func(void *pvArgs); + hs_status_t handshake(uint32_t i2c_addr); dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, uint8_t routing_buf_idx); bool routing(void); @@ -80,291 +83,30 @@ bool encode_subscription_callback(pb_ostream_t *ostream, const pb_field_t *field bool encode_datapoint_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg); bool decode_data_callback(pb_istream_t *istream, const pb_field_t *field, void **args); bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg); -/** - * @brief Control messages. - * - * uint8_t is sufficient for this enum, that we are going to cast to char directly. - * If ever needed, implement a function to properly typecast. - */ -/**@{ */ -typedef enum ControlMessage -{ - eMSG_LOWER_INAVLID = 0x00, /**< Guard, let's not use 0x00 for messages. */ - eWORKER_CTRL_MSG_CONTINUE = 0x01, /**< Dispatcher to worker, distributing another job. */ - eWORKER_CTRL_MSG_EXIT = 0x02, /**< Dispatcher to worker, all jobs are finished and the worker receiving such can exit. */ - - /* define additional messages here */ - - eMSG_UPPER_INVALID = 0xFF /**< Guard, additional tasks shall be defined above. */ -} eControlMessage; -/**@} */ - -/** - * @defgroup Configuration constants for the dispatcher-worker demo. - */ -/**@{ */ -#define MQUEUE_NUMBER_OF_WORKERS ( 2 ) /**< The number of worker threads, each thread has one queue which is used as income box. */ - -#if ( MQUEUE_NUMBER_OF_WORKERS > 10 ) -#error "Please keep MQUEUE_NUMBER_OF_WORKERS < 10." -#endif - -#define MQUEUE_WORKER_QNAME_BASE "/qNode0" /**< Queue name base. */ -#define MQUEUE_WORKER_QNAME_BASE_LEN ( 6 ) /** Queue name base length. */ - -#define MQUEUE_TIMEOUT_SECONDS ( 1 ) /**< Relative timeout for mqueue functions. */ -#define MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER ( 1 ) /**< Maximum number of messages in a queue. */ - -#define MQUEUE_MSG_WORKER_CTRL_MSG_SIZE sizeof( uint8_t ) /**< Control message size. */ -#define DEMO_ERROR ( -1 ) /**< Any non-zero value would work. */ -/**@} */ - -/** - * @brief Structure used by Worker thread. - */ -/**@{ */ -typedef struct WorkerThreadResources -{ - pthread_t pxID; /**< thread ID. */ - mqd_t xInboxID; /**< mqueue inbox ID. */ -} WorkerThreadResources_t; -/**@} */ - -/** - * @brief Structure used by Dispatcher thread. - */ -/**@{ */ -typedef struct DispatcherThreadResources -{ - pthread_t pxID; /**< thread ID. */ - mqd_t * pOutboxID; /**< a list of mqueue outbox ID. */ -} DispatcherThreadResources_t; -/**@} */ - -/*-----------------------------------------------------------*/ - -static void * prvWorkerThread( void * pvArgs ) -{ - WorkerThreadResources_t pArgList = *( WorkerThreadResources_t * ) pvArgs; - - printf( "Worker thread #[%d] - start %s", ( int ) pArgList.pxID, LINE_BREAK ); - - struct timespec xReceiveTimeout = { 0 }; - - ssize_t xMessageSize = 0; - char pcReceiveBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 }; - - /* This is a worker thread that reacts based on what is sent to its inbox (mqueue). */ - while( true ) - { - clock_gettime( CLOCK_REALTIME, &xReceiveTimeout ); - xReceiveTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS; - - xMessageSize = mq_receive( pArgList.xInboxID, - pcReceiveBuffer, - MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, - 0 ); - - /* Parse messages */ - if( xMessageSize == MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ) - { - switch( ( int ) pcReceiveBuffer[ 0 ] ) - { - case eWORKER_CTRL_MSG_CONTINUE: - /* Task branch, currently only prints message to screen. */ - /* Could perform tasks here. Could also notify dispatcher upon completion, if desired. */ - printf( "Worker thread #[%d] -- Received eWORKER_CTRL_MSG_CONTINUE %s", ( int ) pArgList.pxID, LINE_BREAK ); - break; - - case eWORKER_CTRL_MSG_EXIT: - printf( "Worker thread #[%d] -- Finished. Exit now. %s", ( int ) pArgList.pxID, LINE_BREAK ); - - return NULL; - - default: - /* Received a message that we don't care or not defined. */ - break; - } - } - else - { - /* Invalid message. Error handling can be done here, if desired. */ - } - } - - /* You should never hit here. */ - /* return NULL; */ -} - -/*-----------------------------------------------------------*/ - -static void * prvDispatcherThread( void * pvArgs ) -{ - DispatcherThreadResources_t pArgList = *( DispatcherThreadResources_t * ) pvArgs; - - printf( "Dispatcher thread - start %s", LINE_BREAK ); - - struct timespec xSendTimeout = { 0 }; - - ssize_t xMessageSize = 0; - char pcSendBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 }; - - /* Just for fun, let threads do a total of 100 independent tasks. */ - int i = 0; - const int totalNumOfJobsPerThread = 100; - - /* Distribute 1000 independent tasks to workers, in round-robin fashion. */ - pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_CONTINUE; - - for( i = 0; i < totalNumOfJobsPerThread; i++ ) - { - clock_gettime( CLOCK_REALTIME, &xSendTimeout ); - xSendTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS; - - printf( "Dispatcher iteration #[%d] -- Sending msg to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK ); - - xMessageSize = mq_timedsend( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], - pcSendBuffer, - MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, - 0, - &xSendTimeout ); - - if( xMessageSize != 0 ) - { - /* This error is acceptable in our setup. - * Since inbox for each thread fits only one message. - * In reality, balance inbox size, message arrival rate, and message drop rate. */ - printf( "An acceptable failure -- dispatcher failed to send eWORKER_CTRL_MSG_CONTINUE to outbox ID: %x. errno %d %s", - ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], errno, LINE_BREAK ); - } - } - - /* Control thread is now done with distributing jobs. Tell workers they are done. */ - pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_EXIT; - for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) - { - printf( "Dispatcher [%d] -- Sending eWORKER_CTRL_MSG_EXIT to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK ); - - /* This is a blocking call, to guarantee worker thread exits. */ - xMessageSize = mq_send( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], - pcSendBuffer, - MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, - 0 ); - } - - return NULL; -} /*-----------------------------------------------------------*/ /** - * @brief Job distribution with actor model. + * @brief Main function; * * See the top of this file for detailed description. */ void vStartPOSIXMaster(void *pvParams) { - int i = 0; - int iStatus = 0; - - pthread_t handshake_thread; - pthread_create(&handshake_thread, NULL, handshake_func, NULL); + pthread_t handshake_thread, dataflow_thread, routing_thread; - /* Remove warnings about unused parameters. */ + pthread_create(&handshake_thread, NULL, handshake_func, NULL); + pthread_create(&dataflow_thread, NULL, dataflow_func, NULL); + pthread_create(&routing_thread, NULL, routing_func, NULL); - /* Handles of the threads and related resources. */ - DispatcherThreadResources_t pxDispatcher = { 0 }; - WorkerThreadResources_t pxWorkers[ MQUEUE_NUMBER_OF_WORKERS ] = { { 0 } }; - mqd_t workerMqueues[ MQUEUE_NUMBER_OF_WORKERS ] = { 0 }; - struct mq_attr xQueueAttributesWorker = - { - .mq_flags = 0, - .mq_maxmsg = MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER, - .mq_msgsize = MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, - .mq_curmsgs = 0 - }; + /* Add device-specific stream/thread declerations here, if needed */ + /* ... */ - pxDispatcher.pOutboxID = workerMqueues; - - /* Create message queues for each worker thread. */ - for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) - { - /* Prepare a unique queue name for each worker. */ - char qName[] = MQUEUE_WORKER_QNAME_BASE; - qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i; - - /* Open a queue with -- - * O_CREAT -- create a message queue. - * O_RDWR -- both receiving and sending messages. - */ - pxWorkers[ i ].xInboxID = mq_open( qName, - O_CREAT | O_RDWR, - ( mode_t ) 0, - &xQueueAttributesWorker ); - - if( pxWorkers[ i ].xInboxID == ( mqd_t ) -1 ) - { - printf( "Invalid inbox (mqueue) for worker. %s", LINE_BREAK ); - iStatus = DEMO_ERROR; - break; - } - - /* Outboxes of dispatcher thread is the inboxes of all worker threads. */ - pxDispatcher.pOutboxID[ i ] = pxWorkers[ i ].xInboxID; - } - - /* Create and start Worker threads. */ - if( iStatus == 0 ) - { - for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) - { - ( void ) pthread_create( &( pxWorkers[ i ].pxID ), NULL, prvWorkerThread, &pxWorkers[ i ] ); - } - - /* Create and start dispatcher thread. */ - ( void ) pthread_create( &( pxDispatcher.pxID ), NULL, prvDispatcherThread, &pxDispatcher ); - - /* Actors will do predefined tasks in threads. Current implementation is that - * dispatcher actor notifies worker actors to terminate upon finishing distributing tasks. */ - - /* Wait for worker threads to join. */ - for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) - { - ( void ) pthread_join( pxWorkers[ i ].pxID, NULL ); - } - - /* Wait for dispatcher thread to join. */ - ( void ) pthread_join( pxDispatcher.pxID, NULL ); - } - - /* Close and unlink worker message queues. */ - for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ ) - { - char qName[] = MQUEUE_WORKER_QNAME_BASE; - qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i; - - if( pxWorkers[ i ].xInboxID != NULL ) - { - ( void ) mq_close( pxWorkers[ i ].xInboxID ); - ( void ) mq_unlink( qName ); - } - } - - /* Have something on console. */ - if( iStatus == 0 ) - { - printf( "All threads finished. %s", LINE_BREAK ); - } - else - { - printf( "Queues did not get initialized properly. Did not run demo. %s", LINE_BREAK ); - } - /* This task was created with the native xTaskCreate() API function, so must not run off the end of its implementing thread. */ - /* vTaskDelete( NULL ); */ + /* vTaskDelete(NULL); */ } static void *handshake_func(void * pvArgs) @@ -383,7 +125,29 @@ static void *handshake_func(void * pvArgs) return NULL; } -hs_status_t handshake(uint32_t device_id) +static void *dataflow_func(void *pvArgs) +{ + printf("Dataflow thread started %s", LINE_BREAK); + for (;;) { + for (int device_idx = 0; device_idx < BUS_DEVICE_LIMIT-1; device_idx++) { + if (dev_sts[device_idx] == REGISTERED) { + device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_TX, 0); + } + } + } + return NULL; +} + +static void *routing_func(void *pvArgs) +{ + printf("Routing thread started %s", LINE_BREAK); + for (;;) { + routing(); + cmd_routing(); + } + return NULL; +} +hs_status_t handshake(uint32_t i2c_addr) { /* Handshake variables */ @@ -454,10 +218,10 @@ hs_status_t handshake(uint32_t device_id) hs_sts = HS_FAILED; } else { - device_info[dev_idx] = malloc(sizeof(device_info_t)); - device_info[dev_idx]->i2c_addr = i2c_addr; - device_info[dev_idx]->device_id = dev_idx; - device_info[dev_idx]->MDR = MDR_res_message; + device_info[dev_idx] = malloc(sizeof(device_info_t)); + device_info[dev_idx]->i2c_addr = i2c_addr; + device_info[dev_idx]->device_id = dev_idx; + device_info[dev_idx]->MDR = MDR_res_message; hs_sts = HS_REGISTERED; } @@ -469,6 +233,264 @@ hs_status_t handshake(uint32_t device_id) return hs_sts; } +dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile uint8_t rbuf_idx) +{ + uint8_t dev_idx = GET_IDX_FROM_ADDR(i2c_addr); + dataflow_status_t df_status = DF_IDLE; + uint8_t CTS_buf[2] = {0x2, 0xFF}; + + uint8_t DOC_buf[4]; + uint8_t cmd_dest; + + uint32_t data_len = 0; + + void **vptr = malloc(sizeof(uint8_t*)); + vptr[0] = malloc(sizeof(uint8_t*)); + vptr[0] = &i2c_addr; + + /* TODO Add default values to the CTS message in proto */ +#if defined(TESTING_ENABLE) || defined(DEBUG_ENABLE) + uint8_t debug_buf[128]={0}; +#endif + + while (df_status != DF_SUCCESS && df_status != DF_FAIL) { + switch (df_status) { + case (DF_IDLE): + { + uint8_t SOR_buf[2] = {SOR_code, 0x0}; + if (stream.write(SOR_buf, 2, vptr, &stream) != 0) { + df_status = DF_FAIL; + break; + } + else { + if (SOR_code == SLAVE_TX) { + df_status = DF_RX_DOC; + } + else if (SOR_code == SLAVE_RX_DATAPOINT) { + df_status = DF_TX_DATA; + } + else if (SOR_code == SLAVE_RX_COMMAND) { + df_status = DF_TX_CMD; + } + } + break; + } + + case (DF_RX_DOC): + { + if (stream.read(DOC_buf, 4, vptr, &stream) != 0) { + df_status = DF_FAIL; + break; + } + else { + if (DOC_buf[0] == 0x0) { + /* Do nothing DOC; should become redundant once dataflow is initiated using classes */ + df_status = DF_SUCCESS; + } + else if (DOC_buf[1] == DATA) { + df_status = DF_CTS; + data_len = DOC_buf[3]; + } + else if (DOC_buf[1] == CMD_UNICAST) { + df_status = DF_CTS; + cmd_dest = DOC_buf[0]; + data_len = DOC_buf[3]; + } + else if (DOC_buf[1] == CMD_MULTICAST) { + /* TODO */ + } + else if (DOC_buf[1] == CMD_BROADCAST) { + /* TODO */ + } + else { + df_status = DF_FAIL; + } + } + break; + } + + case (DF_CTS): + { + if (stream.write(CTS_buf, 2, vptr, &stream) != 0) { + df_status = DF_FAIL; + } + else { + if (DOC_buf[1] == DATA) { + df_status = DF_RX_DATA; + } + else { + if (DOC_buf[1] == CMD_UNICAST) { + df_status = DF_RX_CMD; + } + } + } + break; + } + + case (DF_RX_DATA): + { + data_rbuf[data_routing_ptr] = malloc(sizeof(uint8_t)*data_len); + if (stream.read(data_rbuf[data_routing_ptr], data_len, vptr, &stream) != 0) { + df_status = DF_FAIL; + free(data_rbuf[data_routing_ptr]); + break; + } + else { + data_src_idx_rbuf[data_routing_ptr] = dev_idx; + data_len_buf[data_routing_ptr] = (uint8_t)data_len; + data_routing_ptr++; + df_status = DF_SUCCESS; + } + break; + } + + case (DF_TX_DATA): + { + /* TODO error checking */ + /* Will need to package datapoint and MDR to know their lengths + Once cached, will not need to do this */ + + /* Do this after handshake to cache ==================================== */ + uint8_t MDR_buf[128]; + uint8_t src_device_idx = data_src_idx_rbuf[rbuf_idx]; + s2m_MDR_response data_src_MDR = device_info[src_device_idx]->MDR; + pb_ostream_t MDR_ostream = pb_ostream_from_buffer(MDR_buf, sizeof(MDR_buf)); + data_src_MDR.subscriptions.funcs.encode=master_encode_MDR_callback; + pb_encode(&MDR_ostream, s2m_MDR_response_fields, &data_src_MDR); + uint8_t MDR_len = MDR_ostream.bytes_written; + /* ==================================================================== */ + + uint8_t data_len = data_len_buf[rbuf_idx]; + uint8_t data_MDR_len_buf[4] = {0, MDR_len, 0, data_len}; + + uint8_t status = get_CTS(i2c_addr); + + if (status != 0 || + stream.write(data_MDR_len_buf, 4, vptr, &stream) != 0); + + status = get_CTS(i2c_addr); + if (status == 0 && + stream.write(MDR_buf, MDR_len, vptr, &stream) == 0 && + stream.write(data_rbuf[rbuf_idx], data_len, vptr, &stream) == 0) { + df_status = DF_SUCCESS; + } + else { + df_status = DF_FAIL; + } + break; + } + + case (DF_RX_CMD): + { + uint8_t *cmd_buf; + cmd_buf = (uint8_t*)malloc(data_len); + + if (stream.read(cmd_buf, data_len, vptr, &stream) != 0) { + free(cmd_buf); + df_status = DF_FAIL; + } + else { + cmd_routing_buf[cmd_routing_ptr] = malloc(sizeof(uint8_t)*data_len); + memcpy(cmd_routing_buf[cmd_routing_ptr], cmd_buf, data_len); + free(cmd_buf); + + cmd_src_idx_rbuf[cmd_routing_ptr] = dev_idx; + cmd_dst_idx_rbuf[cmd_routing_ptr] = cmd_dest; + cmd_routing_ptr++; + + df_status = DF_SUCCESS; + } + break; + } + + case (DF_TX_CMD): + { + uint8_t data_len = sizeof(cmd_routing_buf[rbuf_idx]); + uint8_t len_buf[] = {0x0, data_len, 0x0, 0x0}; + + uint8_t status = get_CTS(i2c_addr); + if (status == 0 && + stream.write(len_buf, 4, vptr, &stream) == 0) { + df_status = DF_FAIL; + } + + if (df_status != DF_FAIL) { + uint8_t status = get_CTS(i2c_addr); + if (status == 0 && + stream.write(cmd_routing_buf[rbuf_idx], data_len, vptr, &stream) == 0) { + df_status = DF_SUCCESS; + } + else { + df_status = DF_FAIL; + } + } + break; + } + case DF_SUCCESS: + case DF_FAIL: + break; + } + } + + return df_status; +} + +bool routing(void) +{ + /* This table holds information on where to send each datapoint in the routing buffer */ + uint32_t routing_table[ROUTING_BUFSIZE][4] = {{0, 0}}; + + /* Build table with routing information */ + for (uint8_t rbuf_data_idx = 0; rbuf_data_idx < data_routing_ptr; rbuf_data_idx++) { + uint8_t src_module_idx = data_src_idx_rbuf[rbuf_data_idx]; + for (uint8_t dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT; dev_idx++) { + if (!(GET_BIT_FROM_IDX(allocated, dev_idx) && 1)) { // No module at this index + continue; + } + bool alloc = false; + for (uint8_t dev_sub_idx = 0; + dev_sub_idx < subs_info[dev_idx]->mod_idx && !alloc; dev_sub_idx++) { + if (subs_info[dev_idx]->module_ids[dev_sub_idx] == + device_info[src_module_idx]->MDR.module_id) { + SET_BIT_FROM_IDX(routing_table[rbuf_data_idx], dev_idx); + alloc = true; + } + } + /* TODO entity ID, I2C addr and class routing, should go in the if condition above */ + } + } + + for (uint8_t rbuf_data_idx = 0; rbuf_data_idx < data_routing_ptr; rbuf_data_idx++) { + for (uint8_t device_idx = 0; device_idx < BUS_DEVICE_LIMIT; device_idx++) { + if (GET_BIT_FROM_IDX(allocated, device_idx) && + GET_BIT_FROM_IDX(routing_table[rbuf_data_idx], device_idx)) { + device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_RX_DATAPOINT, rbuf_data_idx); + } + } + free(data_rbuf[rbuf_data_idx]); + } + + /* Reset the routing pointer, since all data in buffer should have been routed */ + data_routing_ptr = 0; + return true; +} + +bool cmd_routing(void) +{ + /* uint32_t routing_table[4]; */ + for (uint8_t rbuf_cmd_idx = 0; rbuf_cmd_idx < cmd_routing_ptr; rbuf_cmd_idx++) { + uint8_t dst_module_idx = cmd_dst_idx_rbuf[rbuf_cmd_idx]; + for (int dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT; dev_idx++) { + if (GET_BIT_FROM_IDX(allocated, dev_idx) && + device_info[dev_idx]->MDR.module_id == dst_module_idx) { + device_dataflow(GET_ADDR_FROM_IDX(dev_idx), SLAVE_RX_COMMAND, rbuf_cmd_idx); + } + } + free(cmd_routing_buf[rbuf_cmd_idx]); + } + cmd_routing_ptr = 0; + return true; +} bool todo_hs_or_not_todo_hs(uint8_t i2c_addr) { @@ -542,3 +564,28 @@ bool decode_subscriptions_callback(pb_istream_t *istream, const pb_field_t *fiel subs.i2c_address; return true; } + +bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg) +{ + if (!pb_encode_tag_for_field(ostream, field)) { + return false; + } + return true; +} + +uint8_t get_CTS(uint8_t i2c_addr) +{ + uint8_t CTS_buf[2]; + + void **vptr = malloc(sizeof(uint8_t*)); + vptr[0] = malloc(sizeof(uint8_t*)); + vptr[0] = &i2c_addr; + + uint8_t status = stream.read(CTS_buf, 2, vptr, &stream); + if (status == 0 && CTS_buf[1] == 0x1) { + return 0; + } + else { + return 1; + } +} |
