diff options
| author | Aditya Naik | 2020-08-11 13:47:05 -0400 |
|---|---|---|
| committer | Aditya Naik | 2020-08-11 13:47:05 -0400 |
| commit | 8c3fae3116c014ff4336ca38bccbe368a9553b63 (patch) | |
| tree | 9251b073d5864ec4d62c1bd31e34307a94c9314e | |
| parent | 5ddee067e227a98740dc17a35b2eca0d3260d079 (diff) | |
Tested one branch of dataflow
| -rw-r--r-- | include/dataflow.h | 14 | ||||
| -rw-r--r-- | src/dataflow.c | 66 | ||||
| -rw-r--r-- | src/master_posix.c | 129 |
3 files changed, 63 insertions, 146 deletions
diff --git a/include/dataflow.h b/include/dataflow.h index 4d6b6c0..38bf525 100644 --- a/include/dataflow.h +++ b/include/dataflow.h @@ -19,7 +19,7 @@ * |------------+-----+----------------------------------------+---------------------+-------------------------| * * -*/ + */ typedef enum SOR_codes { SLAVE_TX = 1, @@ -43,12 +43,12 @@ typedef enum DOC_codes { #define DF_STATE_TABLE(ENTRY) \ ENTRY(DF_IDLE, DF_func_idle) \ - ENTRY(DF_RX_DOC, DF_rx_doc) \ - ENTRY(DF_RX_CTS, DF_rx_cts) \ - ENTRY(DF_RX_DATA, DF_rx_data) \ - ENTRY(DF_RX_CMD, DF_rx_cmd) \ - ENTRY(DF_TX_DATA, DF_tx_data) \ - ENTRY(DF_TX_CMD, DF_tx_cmd) + ENTRY(DF_RX_DOC, DF_func_rx_doc) \ + ENTRY(DF_TX_CTS, DF_func_tx_cts) \ + ENTRY(DF_RX_DATA, DF_func_rx_data) \ + ENTRY(DF_RX_CMD, DF_func_rx_cmd) \ + ENTRY(DF_TX_DATA, DF_func_tx_data) \ + ENTRY(DF_TX_CMD, DF_func_tx_cmd) typedef enum dataflow_states { DF_STATE_TABLE(EXPAND_AS_ENUM) diff --git a/src/dataflow.c b/src/dataflow.c index e85124a..22a50ab 100644 --- a/src/dataflow.c +++ b/src/dataflow.c @@ -26,24 +26,26 @@ int DF_func_idle(p_stream_t stream, void **args) { int df_state = DF_IDLE; - uint32_t SOR_code = *(uint32_t*)args[0]; + uint8_t SOR_code = *(uint8_t*)args[0]; uint8_t SOR_buf[2] = {SOR_code, 0x0}; if (stream.write(SOR_buf, 2, NULL, &stream) != 0) { df_state = DF_STATE_FAIL; } else { - if (SOR_code == SLAVE_TX) { + switch (SOR_code) { + case SLAVE_TX: df_state = DF_RX_DOC; - } - else if (SOR_code == SLAVE_RX_DATAPOINT) { + break; + case SLAVE_RX_DATAPOINT: df_state = DF_TX_DATA; - } - else if (SOR_code == SLAVE_RX_COMMAND) { + break; + case SLAVE_RX_COMMAND: df_state = DF_TX_CMD; - } - else { + break; + default: df_state = DF_STATE_FAIL; + break; } } return df_state; @@ -51,50 +53,48 @@ int DF_func_idle(p_stream_t stream, void **args) int DF_func_rx_doc(p_stream_t stream, void **args) { - uint8_t DOC_buf[4]; int df_state = DF_RX_DOC; - if (stream.read(DOC_buf, 4, NULL, &stream) != 0) { + args[1] = malloc(sizeof(uint8_t)*4); + + if (stream.read(args[1], 4, NULL, &stream) != 0) { + free(args[1]); df_state = DF_STATE_FAIL; } else { - if (DOC_buf[0] == 0x0) { - /* Do nothing DOC */ + switch (((uint8_t*)args[1])[1]) { + case 0x0: df_state = DF_SUCCESS; - } - else if (DOC_buf[1] == DATA) { - df_state = DF_RX_CTS; - args[1] = malloc(sizeof(uint8_t*)); - args[1] = &DOC_buf; /* Put pointer to DOC buf in this arg */ - - } - else if (DOC_buf[1] == CMD_UNICAST) { - df_state = DF_RX_CTS; - args[1] = malloc(sizeof(uint8_t*)); - args[1] = &DOC_buf; /* Put pointer to DOC buf in this arg */ - } - else { + break; + case DATA: + df_state = DF_TX_CTS; + break; + case CMD_UNICAST: + df_state = DF_TX_CTS; + break; + default: df_state = DF_STATE_FAIL; + break; } } return df_state; } -int DF_func_rx_cts(p_stream_t stream, void **args) +int DF_func_tx_cts(p_stream_t stream, void **args) { - int df_state = DF_RX_CTS; + int df_state = DF_TX_CTS; uint8_t CTS_buf[2] = {0x2, 0xFF}; uint8_t *DOC_buf = args[1]; if (stream.write(CTS_buf, 2, NULL, &stream) != 0) { df_state = DF_FAIL; } else { - if (DOC_buf[1] == DATA) { + switch (DOC_buf[1]) { + case DATA: df_state = DF_RX_DATA; - } - else { - if (DOC_buf[1] == CMD_UNICAST) { - df_state = DF_RX_CMD; - } + break; + case CMD_UNICAST: + df_state = DF_RX_CMD; + break; } } return df_state; diff --git a/src/master_posix.c b/src/master_posix.c index 54a94bd..ff8f2b0 100644 --- a/src/master_posix.c +++ b/src/master_posix.c @@ -59,11 +59,12 @@ hs_func_t hs_jumptable[NUM_HS_STATES] = { HS_STATE_TABLE(EXPAND_AS_JUMPTABLE) }; -p_stream_t device_streams[2]; +df_func_t df_jumptable[NUM_DF_STATES] = { + DF_STATE_TABLE(EXPAND_AS_JUMPTABLE) +}; -/* df_func_t df_jumptable[NUM_DF_STATES] = { */ -/* DF_STATE_TABLE(EXPAND_AS_JUMPTABLE) */ -/* }; */ + +p_stream_t device_streams[2]; device_info_t *device_info[BUS_DEVICE_LIMIT] = {NULL}; subscription_info_t* subs_info[BUS_DEVICE_LIMIT]; @@ -119,9 +120,9 @@ void vStartPOSIXMaster(void *pvParams) /* This is the global stream that is being used by handhsake and dataflow */ STREAM_INIT(STDIO, NULL, stream); - pthread_create(&handshake_thread, NULL, handshake_func, NULL); + /* pthread_create(&handshake_thread, NULL, handshake_func, NULL); */ pthread_create(&dataflow_thread, NULL, dataflow_func, NULL); - pthread_create(&routing_thread, NULL, routing_func, NULL); + /* pthread_create(&routing_thread, NULL, routing_func, NULL); */ /* This function will be defined for the port @@ -180,7 +181,7 @@ static void *handshake_func(void * pvArgs) MDR_res_message.subscriptions.funcs.decode = decode_subscriptions_callback; MDR_res_message.subscriptions.arg = (void*)dev_idx; pb_istream_t MDR_res_stream = pb_istream_from_buffer(args[1], - *(uint8_t*)args[0]); + ((uint8_t*)args[0])[0]); if (!pb_decode(&MDR_res_stream, s2m_MDR_response_fields, &MDR_res_message)) { printf("decode fail\n"); hs_state = HS_STATE_FAIL; @@ -190,8 +191,8 @@ static void *handshake_func(void * pvArgs) device_info[dev_idx] = malloc(sizeof(device_info_t)); device_info[dev_idx]->i2c_addr = 0x0; device_info[dev_idx]->device_id = dev_idx; - device_info[dev_idx]->MDR = MDR_res_message; - hs_state = HS_REGISTERED; + device_info[dev_idx]->MDR = MDR_res_message; + hs_state = HS_REGISTERED; } /* } */ @@ -206,13 +207,18 @@ static void *handshake_func(void * pvArgs) static void *dataflow_func(void *pvArgs) { printf("Dataflow thread started %s", LINE_BREAK); - for (;;) { - for (int device_idx = 0; device_idx < BUS_DEVICE_LIMIT-1; device_idx++) { - if (dev_sts[device_idx] == REGISTERED) { - device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_TX, 0); - } - } - } + + void **args; + args = malloc(sizeof(uint8_t*)*10); + args[0] = malloc(sizeof(uint8_t*)); + uint8_t SOR_code = SLAVE_TX; + + args[0] = &SOR_code; + + int df_state = DF_IDLE; + while (df_state != DF_STATE_FAIL && df_state != DF_STATE_SUCCESS) + df_state = df_jumptable[df_state](stream, args); + printf("Dataflow thread ended %s", LINE_BREAK); return NULL; } @@ -247,96 +253,7 @@ dataflow_states_t device_dataflow(uint8_t i2c_addr, uint32_t SOR_code, volatile #endif while (df_status != DF_SUCCESS && df_status != DF_FAIL) { - switch (df_status) { - case (DF_IDLE): - { - uint8_t SOR_buf[2] = {SOR_code, 0x0}; - if (stream.write(SOR_buf, 2, vptr, &stream) != 0) { - df_status = DF_FAIL; - break; - } - else { - if (SOR_code == SLAVE_TX) { - df_status = DF_RX_DOC; - } - else if (SOR_code == SLAVE_RX_DATAPOINT) { - df_status = DF_TX_DATA; - } - else if (SOR_code == SLAVE_RX_COMMAND) { - df_status = DF_TX_CMD; - } - } - break; - } - - case (DF_RX_DOC): - { - if (stream.read(DOC_buf, 4, vptr, &stream) != 0) { - df_status = DF_FAIL; - break; - } - else { - if (DOC_buf[0] == 0x0) { - /* Do nothing DOC; should become redundant once dataflow is initiated using classes */ - df_status = DF_SUCCESS; - } - else if (DOC_buf[1] == DATA) { - df_status = DF_RX_CTS; - data_len = DOC_buf[3]; - } - else if (DOC_buf[1] == CMD_UNICAST) { - df_status = DF_RX_CTS; - cmd_dest = DOC_buf[0]; - data_len = DOC_buf[3]; - } - else if (DOC_buf[1] == CMD_MULTICAST) { - /* TODO */ - } - else if (DOC_buf[1] == CMD_BROADCAST) { - /* TODO */ - } - else { - df_status = DF_FAIL; - } - } - break; - } - - case (DF_RX_CTS): - { - if (stream.write(CTS_buf, 2, vptr, &stream) != 0) { - df_status = DF_FAIL; - } - else { - if (DOC_buf[1] == DATA) { - df_status = DF_RX_DATA; - } - else { - if (DOC_buf[1] == CMD_UNICAST) { - df_status = DF_RX_CMD; - } - } - } - break; - } - - case (DF_RX_DATA): - { - data_rbuf[data_routing_ptr] = malloc(sizeof(uint8_t)*data_len); - if (stream.read(data_rbuf[data_routing_ptr], data_len, vptr, &stream) != 0) { - df_status = DF_FAIL; - free(data_rbuf[data_routing_ptr]); - break; - } - else { - data_src_idx_rbuf[data_routing_ptr] = dev_idx; - data_len_buf[data_routing_ptr] = (uint8_t)data_len; - data_routing_ptr++; - df_status = DF_SUCCESS; - } - break; - } - + switch (df_status) { case (DF_TX_DATA): { /* TODO error checking */ |
