summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAditya Naik2020-08-03 18:11:57 -0400
committerAditya Naik2020-08-03 18:11:57 -0400
commit5154fdecac5260ea3a2fd6416c1186eb4d609449 (patch)
tree91df6c6955a6fb11706920cd5d922d834c170bc4
parent8e1133fcf30fbb4e1d406c47cb87d592c9742c4f (diff)
Added all functions into threads; cleaup
-rw-r--r--include/main.h49
-rw-r--r--include/stream.h7
-rw-r--r--ports/posix/src/stream_stdio.c13
-rw-r--r--ports/stm32f4/src/port.h3
-rw-r--r--ports/stm32f4/src/stream_i2c.c2
-rw-r--r--src/master.c211
-rw-r--r--src/master_posix.c599
7 files changed, 381 insertions, 503 deletions
diff --git a/include/main.h b/include/main.h
index 6579484..60fe0a2 100644
--- a/include/main.h
+++ b/include/main.h
@@ -1,25 +1,11 @@
-/* USER CODE BEGIN Header */
/**
******************************************************************************
* @file : main.h
* @brief : Header for main.c file.
* This file contains the common defines of the application.
******************************************************************************
- * @attention
- *
- * <h2><center>&copy; Copyright (c) 2020 STMicroelectronics.
- * All rights reserved.</center></h2>
- *
- * This software component is licensed by ST under BSD 3-Clause license,
- * the "License"; You may not use this file except in compliance with the
- * License. You may obtain a copy of the License at:
- * opensource.org/licenses/BSD-3-Clause
- *
- ******************************************************************************
*/
-/* USER CODE END Header */
-/* Define to prevent recursive inclusion -------------------------------------*/
#ifndef __MAIN_H
#define __MAIN_H
@@ -27,43 +13,12 @@
extern "C" {
#endif
-/* Includes ------------------------------------------------------------------*/
#include "port.h"
-/* Private includes ----------------------------------------------------------*/
-/* USER CODE BEGIN Includes */
-
-/* USER CODE END Includes */
-
-/* Exported types ------------------------------------------------------------*/
-/* USER CODE BEGIN ET */
-
-/* USER CODE END ET */
-
-/* Exported constants --------------------------------------------------------*/
-/* USER CODE BEGIN EC */
-
-/* USER CODE END EC */
-
-/* Exported macro ------------------------------------------------------------*/
-/* USER CODE BEGIN EM */
-
-/* USER CODE END EM */
-
-/* Exported functions prototypes ---------------------------------------------*/
void Error_Handler(void);
-/* USER CODE BEGIN EFP */
-
-/* USER CODE END EFP */
-
-/* Private defines -----------------------------------------------------------*/
-#define led_Pin GPIO_PIN_13
-#define led_GPIO_Port GPIOC
-/* USER CODE BEGIN Private defines */
-
-/* USER CODE END Private defines */
-
+#define BUS_DEVICE_LIMIT 16
+
#ifdef __cplusplus
}
#endif
diff --git a/include/stream.h b/include/stream.h
index 6bdc29e..96d9823 100644
--- a/include/stream.h
+++ b/include/stream.h
@@ -1,10 +1,11 @@
#include <stdint.h>
#include <stddef.h>
-typedef struct p_stream_s p_stream_t;
+/* typedef struct p_stream_s p_stream_t; */
-struct p_stream_s {
+typedef struct {
int (*read)(uint8_t *buf, size_t len, void **vptr, void *sptr);
int (*write)(uint8_t *buf, size_t len, void **vptr, void *sptr);
+ int (*init)(void **vptr, void *sptr);
void **props;
-};
+} p_stream_t;
diff --git a/ports/posix/src/stream_stdio.c b/ports/posix/src/stream_stdio.c
index ed83e7b..e9b6f09 100644
--- a/ports/posix/src/stream_stdio.c
+++ b/ports/posix/src/stream_stdio.c
@@ -1,6 +1,19 @@
#include <stdio.h>
+#include <stdlib.h>
#include "stream_stdio.h"
#include "stream.h"
+#include "main.h"
+
+int stdio_init(void **vptr, void *sptr)
+{
+ /* TODO */
+ /* p_stream_t *stream = (p_stream_t*)sptr; */
+ /* int num_files = BUS_DEVICE_LIMIT; */
+ /* int x; */
+ /* char fname_base[] = "devio_", fname[10]; */
+
+ return 0;
+}
int stdio_read(uint8_t* buf, size_t count, void **vptr, void *sptr)
{
diff --git a/ports/stm32f4/src/port.h b/ports/stm32f4/src/port.h
index ffbbe9e..cb55268 100644
--- a/ports/stm32f4/src/port.h
+++ b/ports/stm32f4/src/port.h
@@ -9,4 +9,7 @@
#include "stm32f4xx_hal.h"
+#define led_Pin GPIO_PIN_13
+#define led_GPIO_Port GPIOC
+
#endif
diff --git a/ports/stm32f4/src/stream_i2c.c b/ports/stm32f4/src/stream_i2c.c
index 3e8fc95..8c6c65b 100644
--- a/ports/stm32f4/src/stream_i2c.c
+++ b/ports/stm32f4/src/stream_i2c.c
@@ -19,7 +19,7 @@ int i2c_read(uint8_t* buf, size_t count, void **vptr, void *sptr)
else if (++AF_counter > AF_limit) {
return HAL_I2C_ERROR_AF;
}
- }
+ }
return 0;
}
diff --git a/src/master.c b/src/master.c
index 99cbea5..969fca5 100644
--- a/src/master.c
+++ b/src/master.c
@@ -36,7 +36,7 @@
#define COUNTOF(__BUFFER__) (sizeof(__BUFFER__) / sizeof(*(__BUFFER__)))
/* #define I2C_ADDRESS 0x05 */
-#define BUS_DEVICE_LIMIT 16
+
/* Macro to toggle between master and slave firmware */
#define MASTER
@@ -160,7 +160,7 @@ hs_status_t handshake(uint32_t i2c_addr)
uint32_t dev_idx = GET_IDX_FROM_ADDR(i2c_addr);
uint16_t MDR_len = 0;
- void **vptr = malloc(sizeof(uint8_t*)*2);
+ void **vptr = malloc(sizeof(uint8_t*));
vptr[0] = malloc(sizeof(uint8_t*));
vptr[0] = &i2c_addr;
@@ -338,15 +338,10 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile
switch (df_status) {
case (DF_IDLE):
{
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
uint8_t SOR_buf[2] = {SOR_code, 0x0};
- if (HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, SOR_buf, 2, 500) != HAL_OK) {
+ if (stream.write(SOR_buf, 2, vptr, &stream) != 0) {
df_status = DF_FAIL;
-#ifdef DEBUG_ENABLE
- goto __DF_SOR_I2C_ERROR;
- __DF_SOR_I2C_ERROR_END:
- __asm__("nop");
-#endif
+ break;
}
else {
if (SOR_code == SLAVE_TX) {
@@ -361,29 +356,14 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile
}
break;
}
-
case (DF_RX_DOC):
{
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
- AF_error_counter = 0;
- while (HAL_I2C_Master_Receive(&hi2c1, (uint16_t)i2c_addr,
- (uint8_t*)DOC_buf, 4, 0xffff) != HAL_OK) {
- if (HAL_I2C_GetError(&hi2c1) != HAL_I2C_ERROR_AF) {
- df_status = DF_FAIL;
-#ifdef DEBUG_ENABLE
- goto __DF_DOC_I2C_ERROR;
- __DF_DOC_I2C_ERROR_END:
- __asm__("nop");
-#endif
- break;
- }
- else if (++AF_error_counter > 1500) {
- df_status = DF_FAIL;
- break;
- }
+ if (stream.read(DOC_buf, 4, vptr, &stream) != 0) {
+ df_status = DF_FAIL;
+ break;
}
- if (df_status != DF_FAIL) {
+ else {
if (DOC_buf[0] == 0x0) {
/* Do nothing DOC; should become redundant once dataflow is initiated using classes */
df_status = DF_SUCCESS;
@@ -409,16 +389,11 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile
}
break;
}
+
case (DF_CTS):
{
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
- if (HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, CTS_buf, 2, 1000) != HAL_OK) {
+ if (stream.write(CTS_buf, 2, vptr, &stream) != 0) {
df_status = DF_FAIL;
-#ifdef DEBUG_ENABLE
- goto __DF_CTS_I2C_ERROR;
- __DF_CTS_I2C_ERROR_END:
- __asm__("nop");
-#endif
}
else {
if (DOC_buf[1] == DATA) {
@@ -432,42 +407,26 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile
}
break;
}
+
case (DF_RX_DATA):
{
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
data_rbuf[data_routing_ptr] = malloc(sizeof(uint8_t)*data_len);
- AF_error_counter = 0;
- while (HAL_I2C_Master_Receive(&hi2c1, (uint16_t)i2c_addr,
- (uint8_t*)data_rbuf[data_routing_ptr],
- data_len, 1000) != HAL_OK) {
- if (HAL_I2C_GetError(&hi2c1) != HAL_I2C_ERROR_AF) {
- df_status = DF_FAIL;
-#ifdef DEBUG_ENABLE
- goto __DF_DATA_I2C_ERROR;
- __DF_DATA_I2C_ERROR_END:
- __asm__("nop");
-#endif
- break;
- }
- else if (++AF_error_counter > 1500) {
- df_status = DF_FAIL;
- break;
- }
+ if (stream.read(data_rbuf[data_routing_ptr], data_len, vptr, &stream) != 0) {
+ df_status = DF_FAIL;
+ free(data_rbuf[data_routing_ptr]);
+ break;
}
- if (df_status != DF_FAIL) {
+ else {
data_src_idx_rbuf[data_routing_ptr] = dev_idx;
data_len_buf[data_routing_ptr] = (uint8_t)data_len;
data_routing_ptr++;
df_status = DF_SUCCESS;
}
- else {
- free(data_rbuf[data_routing_ptr]);
- }
break;
}
+
case (DF_TX_DATA):
{
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
/* TODO error checking */
/* Will need to package datapoint and MDR to know their lengths
Once cached, will not need to do this */
@@ -486,66 +445,32 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile
uint8_t data_MDR_len_buf[4] = {0, MDR_len, 0, data_len};
uint8_t status = get_CTS(i2c_addr);
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
- if (status != 0 &&
- HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr,
- data_MDR_len_buf, 4, 10000) == HAL_OK) {
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "MDR len: %d data len: %d SENT\r\n", MDR_len, data_len);
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
- }
- else {
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Failed to send lengths\r\n");
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
- }
+
+ if (status == 0 ||
+ stream.write(data_MDR_len_buf, 4, vptr, &stream) != 0);
+
status = get_CTS(i2c_addr);
if (status != 0 &&
- HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, MDR_buf,
- MDR_len, 10000) == HAL_OK &&
- HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, data_rbuf[rbuf_idx],
- data_len, 10000) == HAL_OK) {
+ stream.write(MDR_buf, MDR_len, vptr, &stream) == 0 &&
+ stream.write(data_rbuf[rbuf_idx], data_len) == 0) {
df_status = DF_SUCCESS;
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Data and MDR sent\r\n");
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
}
else {
df_status = DF_FAIL;
}
break;
}
+
case (DF_RX_CMD):
{
- uint8_t *cmd_buf;
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
+ uint8_t *cmd_buf;
cmd_buf = (uint8_t*)malloc(data_len);
- AF_error_counter = 0;
- while (HAL_I2C_Master_Receive(&hi2c1, (uint16_t)i2c_addr, cmd_buf,
- data_len, 10000) != HAL_OK) {
- if (HAL_I2C_GetError(&hi2c1) != HAL_I2C_ERROR_AF) {
- df_status = DF_FAIL;
- }
- if (++AF_error_counter > 3000) {
- df_status = DF_FAIL;
- }
- if (df_status == DF_FAIL) {
- free(cmd_buf);
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Failed to get command, expected len: %ld\r\n", data_len);
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
- break;
- }
+
+ if (stream.read(cmd_buf, data_len, vptr, &stream) != 0) {
+ free(cmd_buf);
+ df_status = DF_FAIL;
}
- if (df_status != DF_FAIL) {
+ else {
cmd_routing_buf[cmd_routing_ptr] = malloc(sizeof(uint8_t)*data_len);
memcpy(cmd_routing_buf[cmd_routing_ptr], cmd_buf, data_len);
free(cmd_buf);
@@ -553,54 +478,31 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile
cmd_src_idx_rbuf[cmd_routing_ptr] = dev_idx;
cmd_dst_idx_rbuf[cmd_routing_ptr] = cmd_dest;
cmd_routing_ptr++;
-
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Got command\r\n");
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
+
df_status = DF_SUCCESS;
}
break;
}
+
case (DF_TX_CMD):
{
- HAL_Delay(MASTER_I2C_BUS_INTERVAL);
uint8_t data_len = sizeof(cmd_routing_buf[rbuf_idx]);
uint8_t len_buf[] = {0x0, data_len, 0x0, 0x0};
- uint8_t status = get_CTS(i2c_addr);
+ uint8_t status = get_CTS(i2c_addr);
if (status != 0 &&
- HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr, len_buf,
- 4, 10000) != HAL_OK) {
+ stream.write(len_buf, 4, vptr, &stream) == 0) {
df_status = DF_FAIL;
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Failed to send cmd len buf to %d\r\n", i2c_addr>>1);
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
}
if (df_status != DF_FAIL) {
uint8_t status = get_CTS(i2c_addr);
if (status != 0 &&
- HAL_I2C_Master_Transmit(&hi2c1, (uint16_t)i2c_addr,
- cmd_routing_buf[rbuf_idx],
- 4, 10000) == HAL_OK) {
+ stream.write(cmd_routing_buf[rbuf_idx], vptr, &stream) == 0) {
df_status = DF_SUCCESS;
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Routed cmd to %d\r\n", i2c_addr>>1);
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
}
else {
df_status = DF_FAIL;
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Failed to send cmd to %d\r\n", i2c_addr>>1);
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
-#endif
}
}
break;
@@ -611,43 +513,6 @@ dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile
}
}
-#ifdef TESTING_ENABLE
- {
- goto __DF_TESTING_BLOCK_END;
- __DF_TESTING_BLOCK_END:
- __asm__("nop");
- }
-#endif
-
-#ifdef DEBUG_ENABLE
- {
- goto __DF_DEBUG_BLOCK_END;
- __DF_SOR_I2C_ERROR:
- sprintf((char*)debug_buf, "Unable to send SOR request to %d. I2C error: %ld\r\n",
- i2c_addr>>1, HAL_I2C_GetError(&hi2c1));
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
- goto __DF_SOR_I2C_ERROR_END;
- __DF_DOC_I2C_ERROR:
- sprintf((char*)debug_buf, "Unable to receive DOC. I2C error: %ld\r\n", HAL_I2C_GetError(&hi2c1));
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
- goto __DF_DOC_I2C_ERROR_END;
- __DF_CTS_I2C_ERROR:
- sprintf((char*)debug_buf, "CTS I2C error: %ld\r\n", HAL_I2C_GetError(&hi2c1));
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
- goto __DF_CTS_I2C_ERROR_END;
- __DF_DATA_I2C_ERROR:
- sprintf((char*)debug_buf, "Unable to receive data. I2C error: %ld\r\n", HAL_I2C_GetError(&hi2c1));
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
- memset(debug_buf, 0, 128);
- goto __DF_DATA_I2C_ERROR_END;
- __DF_DEBUG_BLOCK_END:
- __asm__("nop");
- }
-#endif
-
return df_status;
}
@@ -663,12 +528,6 @@ uint8_t get_CTS(uint8_t i2c_addr)
#endif
uint8_t status = stream.read(CTS_buf, 2, vptr, &stream);
- if (status != 0) {
-#ifdef DEBUG_ENABLE
- sprintf((char*)debug_buf, "Failed to get CTS\r\n");
- HAL_UART_Transmit(&huart1, debug_buf, sizeof(debug_buf), 100);
-#endif
- }
return status;
}
diff --git a/src/master_posix.c b/src/master_posix.c
index 02966bd..83eb741 100644
--- a/src/master_posix.c
+++ b/src/master_posix.c
@@ -4,7 +4,7 @@
* Actually, target-specific includes should go in the port folder
* This should be specified in the master config file
* TODO This file should be moved to the posix port folder
-*/
+ */
/* FreeRTOS includes. */
/* #include "FreeRTOS_POSIX.h" */
@@ -65,9 +65,12 @@ uint32_t data_len_buf[ROUTING_BUFSIZE];
uint32_t data_routing_ptr = 0; /*< Pointer to tail of both data and data index buffers */
uint32_t cmd_routing_ptr = 0; /*< Pointer to tail of cmd and cmd index buffers */
-p_stream_t stream = {&stdio_read, &stdio_write, NULL};
+p_stream_t stream = {&stdio_read, &stdio_write, NULL, NULL};
static void *handshake_func(void * pvArgs);
+static void *dataflow_func(void *pvArgs);
+static void *routing_func(void *pvArgs);
+
hs_status_t handshake(uint32_t i2c_addr);
dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, uint8_t routing_buf_idx);
bool routing(void);
@@ -80,291 +83,30 @@ bool encode_subscription_callback(pb_ostream_t *ostream, const pb_field_t *field
bool encode_datapoint_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg);
bool decode_data_callback(pb_istream_t *istream, const pb_field_t *field, void **args);
bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg);
-/**
- * @brief Control messages.
- *
- * uint8_t is sufficient for this enum, that we are going to cast to char directly.
- * If ever needed, implement a function to properly typecast.
- */
-/**@{ */
-typedef enum ControlMessage
-{
- eMSG_LOWER_INAVLID = 0x00, /**< Guard, let's not use 0x00 for messages. */
- eWORKER_CTRL_MSG_CONTINUE = 0x01, /**< Dispatcher to worker, distributing another job. */
- eWORKER_CTRL_MSG_EXIT = 0x02, /**< Dispatcher to worker, all jobs are finished and the worker receiving such can exit. */
-
- /* define additional messages here */
-
- eMSG_UPPER_INVALID = 0xFF /**< Guard, additional tasks shall be defined above. */
-} eControlMessage;
-/**@} */
-
-/**
- * @defgroup Configuration constants for the dispatcher-worker demo.
- */
-/**@{ */
-#define MQUEUE_NUMBER_OF_WORKERS ( 2 ) /**< The number of worker threads, each thread has one queue which is used as income box. */
-
-#if ( MQUEUE_NUMBER_OF_WORKERS > 10 )
-#error "Please keep MQUEUE_NUMBER_OF_WORKERS < 10."
-#endif
-
-#define MQUEUE_WORKER_QNAME_BASE "/qNode0" /**< Queue name base. */
-#define MQUEUE_WORKER_QNAME_BASE_LEN ( 6 ) /** Queue name base length. */
-
-#define MQUEUE_TIMEOUT_SECONDS ( 1 ) /**< Relative timeout for mqueue functions. */
-#define MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER ( 1 ) /**< Maximum number of messages in a queue. */
-
-#define MQUEUE_MSG_WORKER_CTRL_MSG_SIZE sizeof( uint8_t ) /**< Control message size. */
-#define DEMO_ERROR ( -1 ) /**< Any non-zero value would work. */
-/**@} */
-
-/**
- * @brief Structure used by Worker thread.
- */
-/**@{ */
-typedef struct WorkerThreadResources
-{
- pthread_t pxID; /**< thread ID. */
- mqd_t xInboxID; /**< mqueue inbox ID. */
-} WorkerThreadResources_t;
-/**@} */
-
-/**
- * @brief Structure used by Dispatcher thread.
- */
-/**@{ */
-typedef struct DispatcherThreadResources
-{
- pthread_t pxID; /**< thread ID. */
- mqd_t * pOutboxID; /**< a list of mqueue outbox ID. */
-} DispatcherThreadResources_t;
-/**@} */
-
-/*-----------------------------------------------------------*/
-
-static void * prvWorkerThread( void * pvArgs )
-{
- WorkerThreadResources_t pArgList = *( WorkerThreadResources_t * ) pvArgs;
-
- printf( "Worker thread #[%d] - start %s", ( int ) pArgList.pxID, LINE_BREAK );
-
- struct timespec xReceiveTimeout = { 0 };
-
- ssize_t xMessageSize = 0;
- char pcReceiveBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 };
-
- /* This is a worker thread that reacts based on what is sent to its inbox (mqueue). */
- while( true )
- {
- clock_gettime( CLOCK_REALTIME, &xReceiveTimeout );
- xReceiveTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;
-
- xMessageSize = mq_receive( pArgList.xInboxID,
- pcReceiveBuffer,
- MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
- 0 );
-
- /* Parse messages */
- if( xMessageSize == MQUEUE_MSG_WORKER_CTRL_MSG_SIZE )
- {
- switch( ( int ) pcReceiveBuffer[ 0 ] )
- {
- case eWORKER_CTRL_MSG_CONTINUE:
- /* Task branch, currently only prints message to screen. */
- /* Could perform tasks here. Could also notify dispatcher upon completion, if desired. */
- printf( "Worker thread #[%d] -- Received eWORKER_CTRL_MSG_CONTINUE %s", ( int ) pArgList.pxID, LINE_BREAK );
- break;
-
- case eWORKER_CTRL_MSG_EXIT:
- printf( "Worker thread #[%d] -- Finished. Exit now. %s", ( int ) pArgList.pxID, LINE_BREAK );
-
- return NULL;
-
- default:
- /* Received a message that we don't care or not defined. */
- break;
- }
- }
- else
- {
- /* Invalid message. Error handling can be done here, if desired. */
- }
- }
-
- /* You should never hit here. */
- /* return NULL; */
-}
-
-/*-----------------------------------------------------------*/
-
-static void * prvDispatcherThread( void * pvArgs )
-{
- DispatcherThreadResources_t pArgList = *( DispatcherThreadResources_t * ) pvArgs;
-
- printf( "Dispatcher thread - start %s", LINE_BREAK );
-
- struct timespec xSendTimeout = { 0 };
-
- ssize_t xMessageSize = 0;
- char pcSendBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 };
-
- /* Just for fun, let threads do a total of 100 independent tasks. */
- int i = 0;
- const int totalNumOfJobsPerThread = 100;
-
- /* Distribute 1000 independent tasks to workers, in round-robin fashion. */
- pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_CONTINUE;
-
- for( i = 0; i < totalNumOfJobsPerThread; i++ )
- {
- clock_gettime( CLOCK_REALTIME, &xSendTimeout );
- xSendTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;
-
- printf( "Dispatcher iteration #[%d] -- Sending msg to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK );
-
- xMessageSize = mq_timedsend( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ],
- pcSendBuffer,
- MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
- 0,
- &xSendTimeout );
-
- if( xMessageSize != 0 )
- {
- /* This error is acceptable in our setup.
- * Since inbox for each thread fits only one message.
- * In reality, balance inbox size, message arrival rate, and message drop rate. */
- printf( "An acceptable failure -- dispatcher failed to send eWORKER_CTRL_MSG_CONTINUE to outbox ID: %x. errno %d %s",
- ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], errno, LINE_BREAK );
- }
- }
-
- /* Control thread is now done with distributing jobs. Tell workers they are done. */
- pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_EXIT;
- for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
- {
- printf( "Dispatcher [%d] -- Sending eWORKER_CTRL_MSG_EXIT to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK );
-
- /* This is a blocking call, to guarantee worker thread exits. */
- xMessageSize = mq_send( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ],
- pcSendBuffer,
- MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
- 0 );
- }
-
- return NULL;
-}
/*-----------------------------------------------------------*/
/**
- * @brief Job distribution with actor model.
+ * @brief Main function;
*
* See the top of this file for detailed description.
*/
void vStartPOSIXMaster(void *pvParams)
{
- int i = 0;
- int iStatus = 0;
-
- pthread_t handshake_thread;
- pthread_create(&handshake_thread, NULL, handshake_func, NULL);
+ pthread_t handshake_thread, dataflow_thread, routing_thread;
- /* Remove warnings about unused parameters. */
+ pthread_create(&handshake_thread, NULL, handshake_func, NULL);
+ pthread_create(&dataflow_thread, NULL, dataflow_func, NULL);
+ pthread_create(&routing_thread, NULL, routing_func, NULL);
- /* Handles of the threads and related resources. */
- DispatcherThreadResources_t pxDispatcher = { 0 };
- WorkerThreadResources_t pxWorkers[ MQUEUE_NUMBER_OF_WORKERS ] = { { 0 } };
- mqd_t workerMqueues[ MQUEUE_NUMBER_OF_WORKERS ] = { 0 };
- struct mq_attr xQueueAttributesWorker =
- {
- .mq_flags = 0,
- .mq_maxmsg = MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER,
- .mq_msgsize = MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
- .mq_curmsgs = 0
- };
+ /* Add device-specific stream/thread declerations here, if needed */
+ /* ... */
- pxDispatcher.pOutboxID = workerMqueues;
-
- /* Create message queues for each worker thread. */
- for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
- {
- /* Prepare a unique queue name for each worker. */
- char qName[] = MQUEUE_WORKER_QNAME_BASE;
- qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i;
-
- /* Open a queue with --
- * O_CREAT -- create a message queue.
- * O_RDWR -- both receiving and sending messages.
- */
- pxWorkers[ i ].xInboxID = mq_open( qName,
- O_CREAT | O_RDWR,
- ( mode_t ) 0,
- &xQueueAttributesWorker );
-
- if( pxWorkers[ i ].xInboxID == ( mqd_t ) -1 )
- {
- printf( "Invalid inbox (mqueue) for worker. %s", LINE_BREAK );
- iStatus = DEMO_ERROR;
- break;
- }
-
- /* Outboxes of dispatcher thread is the inboxes of all worker threads. */
- pxDispatcher.pOutboxID[ i ] = pxWorkers[ i ].xInboxID;
- }
-
- /* Create and start Worker threads. */
- if( iStatus == 0 )
- {
- for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
- {
- ( void ) pthread_create( &( pxWorkers[ i ].pxID ), NULL, prvWorkerThread, &pxWorkers[ i ] );
- }
-
- /* Create and start dispatcher thread. */
- ( void ) pthread_create( &( pxDispatcher.pxID ), NULL, prvDispatcherThread, &pxDispatcher );
-
- /* Actors will do predefined tasks in threads. Current implementation is that
- * dispatcher actor notifies worker actors to terminate upon finishing distributing tasks. */
-
- /* Wait for worker threads to join. */
- for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
- {
- ( void ) pthread_join( pxWorkers[ i ].pxID, NULL );
- }
-
- /* Wait for dispatcher thread to join. */
- ( void ) pthread_join( pxDispatcher.pxID, NULL );
- }
-
- /* Close and unlink worker message queues. */
- for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
- {
- char qName[] = MQUEUE_WORKER_QNAME_BASE;
- qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i;
-
- if( pxWorkers[ i ].xInboxID != NULL )
- {
- ( void ) mq_close( pxWorkers[ i ].xInboxID );
- ( void ) mq_unlink( qName );
- }
- }
-
- /* Have something on console. */
- if( iStatus == 0 )
- {
- printf( "All threads finished. %s", LINE_BREAK );
- }
- else
- {
- printf( "Queues did not get initialized properly. Did not run demo. %s", LINE_BREAK );
- }
-
/* This task was created with the native xTaskCreate() API function, so
must not run off the end of its implementing thread. */
- /* vTaskDelete( NULL ); */
+ /* vTaskDelete(NULL); */
}
static void *handshake_func(void * pvArgs)
@@ -383,7 +125,29 @@ static void *handshake_func(void * pvArgs)
return NULL;
}
-hs_status_t handshake(uint32_t device_id)
+static void *dataflow_func(void *pvArgs)
+{
+ printf("Dataflow thread started %s", LINE_BREAK);
+ for (;;) {
+ for (int device_idx = 0; device_idx < BUS_DEVICE_LIMIT-1; device_idx++) {
+ if (dev_sts[device_idx] == REGISTERED) {
+ device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_TX, 0);
+ }
+ }
+ }
+ return NULL;
+}
+
+static void *routing_func(void *pvArgs)
+{
+ printf("Routing thread started %s", LINE_BREAK);
+ for (;;) {
+ routing();
+ cmd_routing();
+ }
+ return NULL;
+}
+hs_status_t handshake(uint32_t i2c_addr)
{
/* Handshake variables */
@@ -454,10 +218,10 @@ hs_status_t handshake(uint32_t device_id)
hs_sts = HS_FAILED;
}
else {
- device_info[dev_idx] = malloc(sizeof(device_info_t));
- device_info[dev_idx]->i2c_addr = i2c_addr;
- device_info[dev_idx]->device_id = dev_idx;
- device_info[dev_idx]->MDR = MDR_res_message;
+ device_info[dev_idx] = malloc(sizeof(device_info_t));
+ device_info[dev_idx]->i2c_addr = i2c_addr;
+ device_info[dev_idx]->device_id = dev_idx;
+ device_info[dev_idx]->MDR = MDR_res_message;
hs_sts = HS_REGISTERED;
}
@@ -469,6 +233,264 @@ hs_status_t handshake(uint32_t device_id)
return hs_sts;
}
+dataflow_status_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile uint8_t rbuf_idx)
+{
+ uint8_t dev_idx = GET_IDX_FROM_ADDR(i2c_addr);
+ dataflow_status_t df_status = DF_IDLE;
+ uint8_t CTS_buf[2] = {0x2, 0xFF};
+
+ uint8_t DOC_buf[4];
+ uint8_t cmd_dest;
+
+ uint32_t data_len = 0;
+
+ void **vptr = malloc(sizeof(uint8_t*));
+ vptr[0] = malloc(sizeof(uint8_t*));
+ vptr[0] = &i2c_addr;
+
+ /* TODO Add default values to the CTS message in proto */
+#if defined(TESTING_ENABLE) || defined(DEBUG_ENABLE)
+ uint8_t debug_buf[128]={0};
+#endif
+
+ while (df_status != DF_SUCCESS && df_status != DF_FAIL) {
+ switch (df_status) {
+ case (DF_IDLE):
+ {
+ uint8_t SOR_buf[2] = {SOR_code, 0x0};
+ if (stream.write(SOR_buf, 2, vptr, &stream) != 0) {
+ df_status = DF_FAIL;
+ break;
+ }
+ else {
+ if (SOR_code == SLAVE_TX) {
+ df_status = DF_RX_DOC;
+ }
+ else if (SOR_code == SLAVE_RX_DATAPOINT) {
+ df_status = DF_TX_DATA;
+ }
+ else if (SOR_code == SLAVE_RX_COMMAND) {
+ df_status = DF_TX_CMD;
+ }
+ }
+ break;
+ }
+
+ case (DF_RX_DOC):
+ {
+ if (stream.read(DOC_buf, 4, vptr, &stream) != 0) {
+ df_status = DF_FAIL;
+ break;
+ }
+ else {
+ if (DOC_buf[0] == 0x0) {
+ /* Do nothing DOC; should become redundant once dataflow is initiated using classes */
+ df_status = DF_SUCCESS;
+ }
+ else if (DOC_buf[1] == DATA) {
+ df_status = DF_CTS;
+ data_len = DOC_buf[3];
+ }
+ else if (DOC_buf[1] == CMD_UNICAST) {
+ df_status = DF_CTS;
+ cmd_dest = DOC_buf[0];
+ data_len = DOC_buf[3];
+ }
+ else if (DOC_buf[1] == CMD_MULTICAST) {
+ /* TODO */
+ }
+ else if (DOC_buf[1] == CMD_BROADCAST) {
+ /* TODO */
+ }
+ else {
+ df_status = DF_FAIL;
+ }
+ }
+ break;
+ }
+
+ case (DF_CTS):
+ {
+ if (stream.write(CTS_buf, 2, vptr, &stream) != 0) {
+ df_status = DF_FAIL;
+ }
+ else {
+ if (DOC_buf[1] == DATA) {
+ df_status = DF_RX_DATA;
+ }
+ else {
+ if (DOC_buf[1] == CMD_UNICAST) {
+ df_status = DF_RX_CMD;
+ }
+ }
+ }
+ break;
+ }
+
+ case (DF_RX_DATA):
+ {
+ data_rbuf[data_routing_ptr] = malloc(sizeof(uint8_t)*data_len);
+ if (stream.read(data_rbuf[data_routing_ptr], data_len, vptr, &stream) != 0) {
+ df_status = DF_FAIL;
+ free(data_rbuf[data_routing_ptr]);
+ break;
+ }
+ else {
+ data_src_idx_rbuf[data_routing_ptr] = dev_idx;
+ data_len_buf[data_routing_ptr] = (uint8_t)data_len;
+ data_routing_ptr++;
+ df_status = DF_SUCCESS;
+ }
+ break;
+ }
+
+ case (DF_TX_DATA):
+ {
+ /* TODO error checking */
+ /* Will need to package datapoint and MDR to know their lengths
+ Once cached, will not need to do this */
+
+ /* Do this after handshake to cache ==================================== */
+ uint8_t MDR_buf[128];
+ uint8_t src_device_idx = data_src_idx_rbuf[rbuf_idx];
+ s2m_MDR_response data_src_MDR = device_info[src_device_idx]->MDR;
+ pb_ostream_t MDR_ostream = pb_ostream_from_buffer(MDR_buf, sizeof(MDR_buf));
+ data_src_MDR.subscriptions.funcs.encode=master_encode_MDR_callback;
+ pb_encode(&MDR_ostream, s2m_MDR_response_fields, &data_src_MDR);
+ uint8_t MDR_len = MDR_ostream.bytes_written;
+ /* ==================================================================== */
+
+ uint8_t data_len = data_len_buf[rbuf_idx];
+ uint8_t data_MDR_len_buf[4] = {0, MDR_len, 0, data_len};
+
+ uint8_t status = get_CTS(i2c_addr);
+
+ if (status != 0 ||
+ stream.write(data_MDR_len_buf, 4, vptr, &stream) != 0);
+
+ status = get_CTS(i2c_addr);
+ if (status == 0 &&
+ stream.write(MDR_buf, MDR_len, vptr, &stream) == 0 &&
+ stream.write(data_rbuf[rbuf_idx], data_len, vptr, &stream) == 0) {
+ df_status = DF_SUCCESS;
+ }
+ else {
+ df_status = DF_FAIL;
+ }
+ break;
+ }
+
+ case (DF_RX_CMD):
+ {
+ uint8_t *cmd_buf;
+ cmd_buf = (uint8_t*)malloc(data_len);
+
+ if (stream.read(cmd_buf, data_len, vptr, &stream) != 0) {
+ free(cmd_buf);
+ df_status = DF_FAIL;
+ }
+ else {
+ cmd_routing_buf[cmd_routing_ptr] = malloc(sizeof(uint8_t)*data_len);
+ memcpy(cmd_routing_buf[cmd_routing_ptr], cmd_buf, data_len);
+ free(cmd_buf);
+
+ cmd_src_idx_rbuf[cmd_routing_ptr] = dev_idx;
+ cmd_dst_idx_rbuf[cmd_routing_ptr] = cmd_dest;
+ cmd_routing_ptr++;
+
+ df_status = DF_SUCCESS;
+ }
+ break;
+ }
+
+ case (DF_TX_CMD):
+ {
+ uint8_t data_len = sizeof(cmd_routing_buf[rbuf_idx]);
+ uint8_t len_buf[] = {0x0, data_len, 0x0, 0x0};
+
+ uint8_t status = get_CTS(i2c_addr);
+ if (status == 0 &&
+ stream.write(len_buf, 4, vptr, &stream) == 0) {
+ df_status = DF_FAIL;
+ }
+
+ if (df_status != DF_FAIL) {
+ uint8_t status = get_CTS(i2c_addr);
+ if (status == 0 &&
+ stream.write(cmd_routing_buf[rbuf_idx], data_len, vptr, &stream) == 0) {
+ df_status = DF_SUCCESS;
+ }
+ else {
+ df_status = DF_FAIL;
+ }
+ }
+ break;
+ }
+ case DF_SUCCESS:
+ case DF_FAIL:
+ break;
+ }
+ }
+
+ return df_status;
+}
+
+bool routing(void)
+{
+ /* This table holds information on where to send each datapoint in the routing buffer */
+ uint32_t routing_table[ROUTING_BUFSIZE][4] = {{0, 0}};
+
+ /* Build table with routing information */
+ for (uint8_t rbuf_data_idx = 0; rbuf_data_idx < data_routing_ptr; rbuf_data_idx++) {
+ uint8_t src_module_idx = data_src_idx_rbuf[rbuf_data_idx];
+ for (uint8_t dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT; dev_idx++) {
+ if (!(GET_BIT_FROM_IDX(allocated, dev_idx) && 1)) { // No module at this index
+ continue;
+ }
+ bool alloc = false;
+ for (uint8_t dev_sub_idx = 0;
+ dev_sub_idx < subs_info[dev_idx]->mod_idx && !alloc; dev_sub_idx++) {
+ if (subs_info[dev_idx]->module_ids[dev_sub_idx] ==
+ device_info[src_module_idx]->MDR.module_id) {
+ SET_BIT_FROM_IDX(routing_table[rbuf_data_idx], dev_idx);
+ alloc = true;
+ }
+ }
+ /* TODO entity ID, I2C addr and class routing, should go in the if condition above */
+ }
+ }
+
+ for (uint8_t rbuf_data_idx = 0; rbuf_data_idx < data_routing_ptr; rbuf_data_idx++) {
+ for (uint8_t device_idx = 0; device_idx < BUS_DEVICE_LIMIT; device_idx++) {
+ if (GET_BIT_FROM_IDX(allocated, device_idx) &&
+ GET_BIT_FROM_IDX(routing_table[rbuf_data_idx], device_idx)) {
+ device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_RX_DATAPOINT, rbuf_data_idx);
+ }
+ }
+ free(data_rbuf[rbuf_data_idx]);
+ }
+
+ /* Reset the routing pointer, since all data in buffer should have been routed */
+ data_routing_ptr = 0;
+ return true;
+}
+
+bool cmd_routing(void)
+{
+ /* uint32_t routing_table[4]; */
+ for (uint8_t rbuf_cmd_idx = 0; rbuf_cmd_idx < cmd_routing_ptr; rbuf_cmd_idx++) {
+ uint8_t dst_module_idx = cmd_dst_idx_rbuf[rbuf_cmd_idx];
+ for (int dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT; dev_idx++) {
+ if (GET_BIT_FROM_IDX(allocated, dev_idx) &&
+ device_info[dev_idx]->MDR.module_id == dst_module_idx) {
+ device_dataflow(GET_ADDR_FROM_IDX(dev_idx), SLAVE_RX_COMMAND, rbuf_cmd_idx);
+ }
+ }
+ free(cmd_routing_buf[rbuf_cmd_idx]);
+ }
+ cmd_routing_ptr = 0;
+ return true;
+}
bool todo_hs_or_not_todo_hs(uint8_t i2c_addr)
{
@@ -542,3 +564,28 @@ bool decode_subscriptions_callback(pb_istream_t *istream, const pb_field_t *fiel
subs.i2c_address;
return true;
}
+
+bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg)
+{
+ if (!pb_encode_tag_for_field(ostream, field)) {
+ return false;
+ }
+ return true;
+}
+
+uint8_t get_CTS(uint8_t i2c_addr)
+{
+ uint8_t CTS_buf[2];
+
+ void **vptr = malloc(sizeof(uint8_t*));
+ vptr[0] = malloc(sizeof(uint8_t*));
+ vptr[0] = &i2c_addr;
+
+ uint8_t status = stream.read(CTS_buf, 2, vptr, &stream);
+ if (status == 0 && CTS_buf[1] == 0x1) {
+ return 0;
+ }
+ else {
+ return 1;
+ }
+}