summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAditya Naik2020-08-11 16:34:10 -0400
committerAditya Naik2020-08-11 16:34:10 -0400
commit7613a31ff1d45a4fb5c51b7164c1dc10d34e01b5 (patch)
treec0f096ddfa2b2769e56cef6064ef0bb6d64b0dd7
parent8c3fae3116c014ff4336ca38bccbe368a9553b63 (diff)
TX functions for dataflow untestedstate_machine
-rw-r--r--src/dataflow.c40
-rw-r--r--src/master_posix.c173
2 files changed, 71 insertions, 142 deletions
diff --git a/src/dataflow.c b/src/dataflow.c
index 22a50ab..0f05703 100644
--- a/src/dataflow.c
+++ b/src/dataflow.c
@@ -133,7 +133,7 @@ int DF_func_rx_cmd(p_stream_t stream, void **args)
uint8_t data_len = DOC_buf[3], cmd_dest = DOC_buf[0];
uint8_t **cmd_routing_buf = (uint8_t**)args[6];
- uint8_t *cmd_routing_ptr = (uint8_t*)args[7];
+ uint8_t *cmd_routing_ptr = (uint8_t*)args[7];
uint8_t* cmd_src_idx_rbuf = (uint8_t*)args[8];
uint8_t* cmd_dst_idx_rbuf = (uint8_t*)args[9];
@@ -148,17 +148,51 @@ int DF_func_rx_cmd(p_stream_t stream, void **args)
cmd_src_idx_rbuf[*cmd_routing_ptr] = 0;
cmd_dst_idx_rbuf[*cmd_routing_ptr] = cmd_dest;
cmd_routing_ptr++;
- df_state = DF_SUCCESS;
+ df_state = DF_STATE_SUCCESS;
}
return df_state;
}
int DF_func_tx_data(p_stream_t stream, void **args)
{
- /* TODO Do this after MDR is cached after handshake, let's not encode it again here */
+ int df_state = DF_RX_DATA;
+ uint8_t **MDR_rbuf = (uint8_t**)args[10];
+ uint8_t *MDR_len_buf = (uint8_t*)args[12];
+ uint8_t *data_len_buf = (uint8_t*)args[5];
+ uint8_t rbuf_idx = *(uint8_t*)args[11];
+ uint8_t **data_rbuf = (uint8_t**)args[2];
+ uint8_t CTS_buf[2];
+ uint8_t data_MDR_len_buf[4] = {0, MDR_len_buf[rbuf_idx], 0, data_len_buf[rbuf_idx]};
+
+ if (stream.read(CTS_buf, 2, NULL, &stream) != 0)
+ df_state = DF_STATE_FAIL;
+ else if (CTS_buf[1] != 0x1)
+ df_state = DF_STATE_FAIL;
+ if (df_state != DF_STATE_FAIL) {
+ if (stream.write(data_MDR_len_buf, 4, NULL, &stream) != 0)
+ df_state = DF_STATE_FAIL;
+ else {
+ if (stream.read(CTS_buf, 2, NULL, &stream) != 0)
+ df_state = DF_STATE_FAIL;
+ else if (CTS_buf[1] != 0x1)
+ df_state = DF_STATE_FAIL;
+ }
+ }
+ if (df_state != DF_STATE_FAIL) {
+ if (stream.write(MDR_rbuf[rbuf_idx], MDR_len_buf[rbuf_idx], NULL, &stream) == 0 &&
+ stream.write(data_rbuf[rbuf_idx], data_len_buf[rbuf_idx], NULL, &stream) == 0)
+ df_state = DF_STATE_SUCCESS;
+ else
+ df_state = DF_STATE_FAIL;
+ }
+ return df_state;
}
int DF_func_tx_cmd(p_stream_t stream, void **args)
{
+ /* TODO This function will not work unless there is a cmd_len buffer */
+ uint8_t rbuf_idx = *(uint8_t*)args[11];
+ uint8_t **cmd_routing_buf = (uint8_t**)args[6];
+ return DF_STATE_FAIL;
}
diff --git a/src/master_posix.c b/src/master_posix.c
index ff8f2b0..4b3da54 100644
--- a/src/master_posix.c
+++ b/src/master_posix.c
@@ -75,6 +75,9 @@ uint8_t data_idx;
uint8_t *data_rbuf[ROUTING_BUFSIZE]; /*< Buffer to store data to be routed */
uint8_t *cmd_routing_buf[ROUTING_BUFSIZE]; /*< Buffer to store commands to be routed */
+uint8_t *MDR_rbuf[BUS_DEVICE_LIMIT]; /*< Buffer to store encoded MDR */
+uint8_t *MDR_len_buf[BUS_DEVICE_LIMIT]; /*< Buffer to store encoded MDR lengths */
+
uint8_t data_src_idx_rbuf[ROUTING_BUFSIZE]; /*< Index information for data source */
uint8_t cmd_src_idx_rbuf[ROUTING_BUFSIZE]; /*< Index information for command source */
uint8_t cmd_dst_idx_rbuf[ROUTING_BUFSIZE]; /*< Index information for command dest */
@@ -90,10 +93,9 @@ static void *handshake_func(void * pvArgs);
static void *dataflow_func(void *pvArgs);
static void *routing_func(void *pvArgs);
-dataflow_states_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, uint8_t routing_buf_idx);
bool routing(void);
bool cmd_routing(void);
-uint8_t get_CTS(uint8_t i2c_addr);
+
bool todo_hs_or_not_todo_hs(uint8_t i2c_addr);
state_t get_state_from_hs_status(uint16_t device_addr, hs_status_t hs_status);
bool decode_subscriptions_callback(pb_istream_t *istream, const pb_field_t *field, void **args);
@@ -176,12 +178,19 @@ static void *handshake_func(void * pvArgs)
hs_state = hs_jumptable[hs_state](stream, args);
if (hs_state == HS_STATE_SUCCESS) {
+ uint8_t MDR_len = ((uint8_t*)args[0])[0];
+ /* Store the encoded MDR buffer */
+ MDR_rbuf[dev_idx] = malloc(sizeof(uint8_t)*MDR_len);
+ memcpy(MDR_rbuf[dev_idx], args[1], MDR_len);
+ free(args[1]);
+
+ MDR_len_buf[dev_idx] = &MDR_len;
+
/* Attempt to decode the protobuf, and add devices */
s2m_MDR_response MDR_res_message = s2m_MDR_response_init_default;
MDR_res_message.subscriptions.funcs.decode = decode_subscriptions_callback;
MDR_res_message.subscriptions.arg = (void*)dev_idx;
- pb_istream_t MDR_res_stream = pb_istream_from_buffer(args[1],
- ((uint8_t*)args[0])[0]);
+ pb_istream_t MDR_res_stream = pb_istream_from_buffer(MDR_rbuf[dev_idx], MDR_len);
if (!pb_decode(&MDR_res_stream, s2m_MDR_response_fields, &MDR_res_message)) {
printf("decode fail\n");
hs_state = HS_STATE_FAIL;
@@ -209,12 +218,26 @@ static void *dataflow_func(void *pvArgs)
printf("Dataflow thread started %s", LINE_BREAK);
void **args;
- args = malloc(sizeof(uint8_t*)*10);
- args[0] = malloc(sizeof(uint8_t*));
- uint8_t SOR_code = SLAVE_TX;
+ args = malloc(sizeof(uint8_t*)*11);
+ int x;
+ for (x = 0; x < 11; x++)
+ args[x] = malloc(sizeof(uint8_t*));
+
+ uint8_t rbuf_idx = 0;
+
+ args[2] = &data_rbuf;
+ args[3] = &data_routing_ptr;
+ args[4] = &data_src_idx_rbuf;
+ args[5] = &data_len_buf;
+ args[6] = &cmd_routing_buf;
+ args[7] = &cmd_routing_ptr;
+ args[8] = &cmd_src_idx_rbuf;
+ args[9] = &cmd_dst_idx_rbuf;
+ args[10] = &MDR_rbuf;
+ args[11] = &rbuf_idx;
+ uint8_t SOR_code = SLAVE_TX;
args[0] = &SOR_code;
-
int df_state = DF_IDLE;
while (df_state != DF_STATE_FAIL && df_state != DF_STATE_SUCCESS)
df_state = df_jumptable[df_state](stream, args);
@@ -232,119 +255,6 @@ static void *routing_func(void *pvArgs)
return NULL;
}
-dataflow_states_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile uint8_t rbuf_idx)
-{
- uint8_t dev_idx = GET_IDX_FROM_ADDR(i2c_addr);
- dataflow_states_t df_status = DF_IDLE;
- uint8_t CTS_buf[2] = {0x2, 0xFF};
-
- uint8_t DOC_buf[4];
- uint8_t cmd_dest;
-
- uint32_t data_len = 0;
-
- void **vptr = malloc(sizeof(uint8_t*));
- vptr[0] = malloc(sizeof(uint8_t*));
- vptr[0] = &i2c_addr;
-
- /* TODO Add default values to the CTS message in proto */
-#if defined(TESTING_ENABLE) || defined(DEBUG_ENABLE)
- uint8_t debug_buf[128]={0};
-#endif
-
- while (df_status != DF_SUCCESS && df_status != DF_FAIL) {
- switch (df_status) {
- case (DF_TX_DATA):
- {
- /* TODO error checking */
- /* Will need to package datapoint and MDR to know their lengths
- Once cached, will not need to do this */
-
- /* Do this after handshake to cache ==================================== */
- uint8_t MDR_buf[128];
- uint8_t src_device_idx = data_src_idx_rbuf[rbuf_idx];
- s2m_MDR_response data_src_MDR = device_info[src_device_idx]->MDR;
- pb_ostream_t MDR_ostream = pb_ostream_from_buffer(MDR_buf, sizeof(MDR_buf));
- data_src_MDR.subscriptions.funcs.encode=master_encode_MDR_callback;
- pb_encode(&MDR_ostream, s2m_MDR_response_fields, &data_src_MDR);
- uint8_t MDR_len = MDR_ostream.bytes_written;
- /* ==================================================================== */
-
- uint8_t data_len = data_len_buf[rbuf_idx];
- uint8_t data_MDR_len_buf[4] = {0, MDR_len, 0, data_len};
-
- uint8_t status = get_CTS(i2c_addr);
-
- if (status != 0 ||
- stream.write(data_MDR_len_buf, 4, vptr, &stream) != 0);
-
- status = get_CTS(i2c_addr);
- if (status == 0 &&
- stream.write(MDR_buf, MDR_len, vptr, &stream) == 0 &&
- stream.write(data_rbuf[rbuf_idx], data_len, vptr, &stream) == 0) {
- df_status = DF_SUCCESS;
- }
- else {
- df_status = DF_FAIL;
- }
- break;
- }
-
- case (DF_RX_CMD):
- {
- uint8_t *cmd_buf;
- cmd_buf = (uint8_t*)malloc(data_len);
-
- if (stream.read(cmd_buf, data_len, vptr, &stream) != 0) {
- free(cmd_buf);
- df_status = DF_FAIL;
- }
- else {
- cmd_routing_buf[cmd_routing_ptr] = malloc(sizeof(uint8_t)*data_len);
- memcpy(cmd_routing_buf[cmd_routing_ptr], cmd_buf, data_len);
- free(cmd_buf);
-
- cmd_src_idx_rbuf[cmd_routing_ptr] = dev_idx;
- cmd_dst_idx_rbuf[cmd_routing_ptr] = cmd_dest;
- cmd_routing_ptr++;
-
- df_status = DF_SUCCESS;
- }
- break;
- }
-
- case (DF_TX_CMD):
- {
- uint8_t data_len = sizeof(cmd_routing_buf[rbuf_idx]);
- uint8_t len_buf[] = {0x0, data_len, 0x0, 0x0};
-
- uint8_t status = get_CTS(i2c_addr);
- if (status == 0 &&
- stream.write(len_buf, 4, vptr, &stream) == 0) {
- df_status = DF_FAIL;
- }
-
- if (df_status != DF_FAIL) {
- uint8_t status = get_CTS(i2c_addr);
- if (status == 0 &&
- stream.write(cmd_routing_buf[rbuf_idx], data_len, vptr, &stream) == 0) {
- df_status = DF_SUCCESS;
- }
- else {
- df_status = DF_FAIL;
- }
- }
- break;
- }
- case DF_SUCCESS:
- case DF_FAIL:
- break;
- }
- }
-
- return df_status;
-}
-
bool routing(void)
{
/* This table holds information on where to send each datapoint in the routing buffer */
@@ -374,7 +284,8 @@ bool routing(void)
for (uint8_t device_idx = 0; device_idx < BUS_DEVICE_LIMIT; device_idx++) {
if (GET_BIT_FROM_IDX(allocated, device_idx) &&
GET_BIT_FROM_IDX(routing_table[rbuf_data_idx], device_idx)) {
- device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_RX_DATAPOINT, rbuf_data_idx);
+ /* TODO modify this to work with state machine */
+ /* device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_RX_DATAPOINT, rbuf_data_idx); */
}
}
free(data_rbuf[rbuf_data_idx]);
@@ -393,7 +304,7 @@ bool cmd_routing(void)
for (int dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT; dev_idx++) {
if (GET_BIT_FROM_IDX(allocated, dev_idx) &&
device_info[dev_idx]->MDR.module_id == dst_module_idx) {
- device_dataflow(GET_ADDR_FROM_IDX(dev_idx), SLAVE_RX_COMMAND, rbuf_cmd_idx);
+ /* device_dataflow(GET_ADDR_FROM_IDX(dev_idx), SLAVE_RX_COMMAND, rbuf_cmd_idx); */
}
}
free(cmd_routing_buf[rbuf_cmd_idx]);
@@ -483,19 +394,3 @@ bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field,
return true;
}
-uint8_t get_CTS(uint8_t i2c_addr)
-{
- uint8_t CTS_buf[2];
-
- void **vptr = malloc(sizeof(uint8_t*));
- vptr[0] = malloc(sizeof(uint8_t*));
- vptr[0] = &i2c_addr;
-
- uint8_t status = stream.read(CTS_buf, 2, vptr, &stream);
- if (status == 0 && CTS_buf[1] == 0x1) {
- return 0;
- }
- else {
- return 1;
- }
-}