summaryrefslogtreecommitdiff
path: root/src/master_posix.c
blob: 4b3da547307a31f840985625c40230b766df6442 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/* 
 * TODO Do conditional includes based on which target we are building for.
 *   Actually, target-specific includes should go in the port folder
 *   This should be specified in the master config file
 * TODO This file should be moved to the posix port folder
 *
 */

/* FreeRTOS includes. */
/* #include "FreeRTOS_POSIX.h" */

/* System headers. */
#include <stdbool.h>
#include <string.h>
#include <stdio.h>

/* Library includes */
#include <pb_encode.h>
#include <pb_decode.h>

/* (Ideally) platform-agnostic project includes. */
#include "master_posix.h"
#include "main.h"
#include "devices.h"
#include "config.h"
#include "dataflow.h"
#include "handshake.pb.h"
#include "data.pb.h"
#include "stream.h"
#include "stream_stdio.h"
#include "port.h"
#include "handshake.h"

/* FreeRTOS+POSIX. should go in the port folder */
/* #include "FreeRTOS_POSIX/pthread.h" */
/* #include "FreeRTOS_POSIX/mqueue.h" */
/* #include "FreeRTOS_POSIX/time.h" */
/* #include "FreeRTOS_POSIX/fcntl.h" */
/* #include "FreeRTOS_POSIX/errno.h" */

/* Constants. */
#define LINE_BREAK    "\r\n"
#define device_MDR s2m_MDR_response
#define GET_IDX_FROM_ADDR(i2c_addr) (i2c_addr>>1)-1
#define GET_ADDR_FROM_IDX(idx)      (idx+1)<<1
#define GET_BIT_FROM_IDX(a, b) a[b>>5]&(1<<(b%32))
#define SET_BIT_FROM_IDX(a, b) a[b>>5]|=(1<<(b%32))
#define COUNTOF(__BUFFER__)   (sizeof(__BUFFER__) / sizeof(*(__BUFFER__)))

#define STREAM_INIT(PERIPH, vptr, sptr) \
    sptr.read=&read_##PERIPH;		\
    sptr.write=&write_##PERIPH;		\
    sptr.props=vptr;

#define BUS_DEVICE_LIMIT 2

hs_func_t hs_jumptable[NUM_HS_STATES] = {
    HS_STATE_TABLE(EXPAND_AS_JUMPTABLE)
};

df_func_t df_jumptable[NUM_DF_STATES] = {
    DF_STATE_TABLE(EXPAND_AS_JUMPTABLE)
};


p_stream_t device_streams[2];

device_info_t *device_info[BUS_DEVICE_LIMIT] = {NULL};
subscription_info_t* subs_info[BUS_DEVICE_LIMIT];
uint32_t allocated[4]={0};
uint8_t dev_sts[BUS_DEVICE_LIMIT] = {OFFLINE};
uint8_t data_idx;

uint8_t *data_rbuf[ROUTING_BUFSIZE];        /*< Buffer to store data to be routed */
uint8_t *cmd_routing_buf[ROUTING_BUFSIZE];  /*< Buffer to store commands to be routed */

uint8_t *MDR_rbuf[BUS_DEVICE_LIMIT];        /*< Buffer to store encoded MDR */
uint8_t *MDR_len_buf[BUS_DEVICE_LIMIT];     /*< Buffer to store encoded MDR lengths */

uint8_t    data_src_idx_rbuf[ROUTING_BUFSIZE];   /*< Index information for data source */
uint8_t    cmd_src_idx_rbuf[ROUTING_BUFSIZE];    /*< Index information for command source */
uint8_t    cmd_dst_idx_rbuf[ROUTING_BUFSIZE];    /*< Index information for command dest */

uint32_t    data_len_buf[ROUTING_BUFSIZE];

uint32_t   data_routing_ptr = 0;     /*< Pointer to tail of both data and data index buffers */
uint32_t   cmd_routing_ptr = 0;      /*< Pointer to tail of cmd and cmd index buffers */

p_stream_t stream;

static void *handshake_func(void * pvArgs);
static void *dataflow_func(void *pvArgs);
static void *routing_func(void *pvArgs);

bool routing(void);
bool cmd_routing(void);

bool todo_hs_or_not_todo_hs(uint8_t i2c_addr);
state_t get_state_from_hs_status(uint16_t device_addr, hs_status_t hs_status);
bool decode_subscriptions_callback(pb_istream_t *istream, const pb_field_t *field, void **args);
bool encode_subscription_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg);
bool encode_datapoint_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg);
bool decode_data_callback(pb_istream_t *istream, const pb_field_t *field, void **args);
bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg);


/*-----------------------------------------------------------*/

/**
 * @brief Main function; 
 *
 * See the top of this file for detailed description.
 */
void vStartPOSIXMaster(void *pvParams)
{
    pthread_t handshake_thread, dataflow_thread, routing_thread;

    /* Stream initializations should be handled by the devicetree library once that's set up */
    STREAM_INIT(STDIO, NULL, device_streams[0]);
    STREAM_INIT(STDIO, NULL, device_streams[1]);
    /* This is the global stream that is being used by handhsake and dataflow */
    STREAM_INIT(STDIO, NULL, stream);
    
    /* pthread_create(&handshake_thread, NULL, handshake_func, NULL); */
    pthread_create(&dataflow_thread, NULL, dataflow_func, NULL);
    /* pthread_create(&routing_thread, NULL, routing_func, NULL); */


    /* This function will be defined for the port
     * Each port will have a configuration file that includes details about the bus, 
     *     either in a custom format or in a devicetree format (preferred)
     * 
     */
    
    /* Add device-specific stream/thread declerations here, if needed */
    /* ... */
    
    /* This task was created with the native xTaskCreate() API function, so
       must not run off the end of its implementing thread. */
    /* vTaskDelete(NULL); */
}

static void *handshake_func(void * pvArgs)
{    
    printf("Handshake thread started %s", LINE_BREAK);
    /* for (;;) { */
	for (int dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT-1; dev_idx++) {
	    /* 
	     * With the new state machine structure, what is the correct condition 
	     * for performing a handshake?
	     *
	     * Answer: Handshake attempts will occur based on user-configured processor devices. 
	     * This ties in with the (new) grander philosophy at work here: a processor will have 
	     * two types of configurations of devices, viz. ones that are present in the 
	     * devicetree at boot time and others that will be hot-plugged during runtime. The 
	     * user will configure certain peripherals (in a TBD manner) to enable 
	     * hot-pluggability of peripherals on those devices. For example, a UART device could 
	     * be configured hot-pluggable, which would result in handshake being attempted at the 
	     * device every now and then. 

	     * This gets interesting when devices are configured as 
	     * both hot-pluggable and with statically configured peripherals at the device. For 
	     * example, an I2C device can have one or more peripherals configured on the bus in 
	     * the devicetree, but can also be configured for hot-pluggability. It is obvious that 
	     * not all devices can support dual configuration.. some mechanism will be needed to 
	     * detect device configuration errors at compile time. 
	     *
	     */
	    /* if (todo_hs_or_not_todo_hs(GET_ADDR_FROM_IDX(dev_idx))) { */
	    int hs_state = HS_STATE_0;
	    void **args;
	    args = malloc(sizeof(uint8_t*)*2);
	    
	    uint32_t dev_idx = 0; /*< Do something with this, not relevant anymore */
	    
	    while (hs_state != HS_STATE_FAIL && hs_state != HS_STATE_SUCCESS)
		hs_state = hs_jumptable[hs_state](stream, args);
	    
	    if (hs_state == HS_STATE_SUCCESS) {
		uint8_t MDR_len = ((uint8_t*)args[0])[0];
		/* Store the encoded MDR buffer */
		MDR_rbuf[dev_idx] = malloc(sizeof(uint8_t)*MDR_len);
		memcpy(MDR_rbuf[dev_idx], args[1], MDR_len);
		free(args[1]);
		
		MDR_len_buf[dev_idx] = &MDR_len;
		
		/* Attempt to decode the protobuf, and add devices */
		s2m_MDR_response MDR_res_message = s2m_MDR_response_init_default;
		MDR_res_message.subscriptions.funcs.decode = decode_subscriptions_callback;
		MDR_res_message.subscriptions.arg = (void*)dev_idx;
		pb_istream_t MDR_res_stream = pb_istream_from_buffer(MDR_rbuf[dev_idx], MDR_len);
		if (!pb_decode(&MDR_res_stream, s2m_MDR_response_fields, &MDR_res_message)) {
		    printf("decode fail\n");
		    hs_state = HS_STATE_FAIL;
		}
		else {
		    printf("decode done\n");
		    device_info[dev_idx]              = malloc(sizeof(device_info_t));
		    device_info[dev_idx]->i2c_addr    = 0x0;
		    device_info[dev_idx]->device_id   = dev_idx;
		    device_info[dev_idx]->MDR         = MDR_res_message;
		    hs_state = HS_REGISTERED;
		}
		/* } */
		
		/* TODO This is slightly redundant now, so fix this */
		/* dev_sts[dev_idx] = get_state_from_hs_status(GET_ADDR_FROM_IDX(dev_idx), hs_state); */
	    }
	/* } */
    }
    return NULL;
}

static void *dataflow_func(void *pvArgs)
{
    printf("Dataflow thread started %s", LINE_BREAK);
    
    void **args;
    args = malloc(sizeof(uint8_t*)*11);
    int x;
    for (x = 0; x < 11; x++)
	args[x] = malloc(sizeof(uint8_t*));

    uint8_t rbuf_idx = 0;
    
    args[2] = &data_rbuf;
    args[3] = &data_routing_ptr;
    args[4] = &data_src_idx_rbuf;
    args[5] = &data_len_buf;
    args[6] = &cmd_routing_buf;
    args[7] = &cmd_routing_ptr;
    args[8] = &cmd_src_idx_rbuf;
    args[9] = &cmd_dst_idx_rbuf;
    args[10] = &MDR_rbuf;
    args[11] = &rbuf_idx;
    
    uint8_t SOR_code = SLAVE_TX;    
    args[0] = &SOR_code;
    int df_state = DF_IDLE;
    while (df_state != DF_STATE_FAIL && df_state != DF_STATE_SUCCESS)
	df_state = df_jumptable[df_state](stream, args);
    printf("Dataflow thread ended %s", LINE_BREAK);
    return NULL;
}

static void *routing_func(void *pvArgs)
{
    printf("Routing thread started %s", LINE_BREAK);
    for (;;) {
	routing();
	cmd_routing();
    }
    return NULL;
}

bool routing(void)
{
    /* This table holds information on where to send each datapoint in the routing buffer  */
    uint32_t routing_table[ROUTING_BUFSIZE][4] = {{0, 0}};
    
    /* Build table with routing information */
    for (uint8_t rbuf_data_idx = 0; rbuf_data_idx < data_routing_ptr; rbuf_data_idx++) {
	uint8_t src_module_idx = data_src_idx_rbuf[rbuf_data_idx];
	for (uint8_t dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT; dev_idx++) {
	    if (!(GET_BIT_FROM_IDX(allocated, dev_idx) && 1)) { // No module at this index
		continue;
	    }
	    bool alloc = false;
	    for (uint8_t dev_sub_idx = 0;
		 dev_sub_idx < subs_info[dev_idx]->mod_idx && !alloc; dev_sub_idx++) {
		if (subs_info[dev_idx]->module_ids[dev_sub_idx] ==
		    device_info[src_module_idx]->MDR.module_id) {
		    SET_BIT_FROM_IDX(routing_table[rbuf_data_idx], dev_idx);
		    alloc = true;
		}
	    }
	    /* TODO entity ID, I2C addr and class routing, should go in the if condition above */
	}
    }

    for (uint8_t rbuf_data_idx = 0; rbuf_data_idx < data_routing_ptr; rbuf_data_idx++) {
	for (uint8_t device_idx = 0; device_idx < BUS_DEVICE_LIMIT; device_idx++) {
	    if (GET_BIT_FROM_IDX(allocated, device_idx) &&
		GET_BIT_FROM_IDX(routing_table[rbuf_data_idx], device_idx)) {
		/* TODO modify this to work with state machine */
		/* device_dataflow(GET_ADDR_FROM_IDX(device_idx), SLAVE_RX_DATAPOINT, rbuf_data_idx); */
	    }
	}
	free(data_rbuf[rbuf_data_idx]);
    }

    /* Reset the routing pointer, since all data in buffer should have been routed */    
    data_routing_ptr = 0;
    return true;
}

bool cmd_routing(void)
{
    /* uint32_t routing_table[4]; */
    for (uint8_t rbuf_cmd_idx = 0; rbuf_cmd_idx < cmd_routing_ptr; rbuf_cmd_idx++) {
	uint8_t dst_module_idx = cmd_dst_idx_rbuf[rbuf_cmd_idx];
	for (int dev_idx = 0; dev_idx < BUS_DEVICE_LIMIT; dev_idx++) {
	    if (GET_BIT_FROM_IDX(allocated, dev_idx) &&
		device_info[dev_idx]->MDR.module_id == dst_module_idx) {
	    	/* device_dataflow(GET_ADDR_FROM_IDX(dev_idx), SLAVE_RX_COMMAND, rbuf_cmd_idx); */
	    }
	}
	free(cmd_routing_buf[rbuf_cmd_idx]);
    }
    cmd_routing_ptr = 0;
    return true;
}

bool todo_hs_or_not_todo_hs(uint8_t i2c_addr)
{
    uint8_t device_idx = GET_IDX_FROM_ADDR(i2c_addr);
    state_t device_curr_state = dev_sts[device_idx];
    bool do_hs = false;
    switch(device_curr_state) {
    case NO_HS:
    case CONNECTED:
    case FAILED:
    case OFFLINE:
	do_hs = true;
	break;
    case REGISTERED:
    case NO_DATA:
	do_hs = false;
	break;	
    }
    return do_hs;
}

state_t get_state_from_hs_status(uint16_t device_addr, hs_status_t hs_status)
{
    state_t device_state = OFFLINE;
    switch(hs_status) {
    case IDLE:
    case HS_FAILED:
	device_state = OFFLINE;
	break;
    case HS_MDR_ACK:
    case HS_MDR_CTS:
    case HS_MDR_MDR:
	device_state = FAILED;
	break;
    case HS_REGISTERED:
	device_state = REGISTERED;
	break;    
    }
    return device_state;
}


bool decode_subscriptions_callback(pb_istream_t *istream, const pb_field_t *field, void **args)
{
    _subscriptions subs;
    int *subs_idx = (int*)args;

    /* Check is storage is allocated; if not, allocate it */
    if ((GET_BIT_FROM_IDX(allocated, *subs_idx)) == 0) {
	subs_info[*subs_idx] = (subscription_info_t*)malloc(sizeof(subscription_info_t));
	SET_BIT_FROM_IDX(allocated, *subs_idx);
	subs_info[*subs_idx]->mod_idx = subs_info[*subs_idx]->entity_idx =
	    subs_info[*subs_idx]->class_idx = subs_info[*subs_idx]->i2c_idx = 0;
    }
    
    if(!pb_decode(istream, _subscriptions_fields, &subs))
	return false;

    /* Parse all fields if they're included */
    if (subs.has_module_id)
	subs_info[*subs_idx]->module_ids[subs_info[*subs_idx]->mod_idx++] =
	    subs.module_id;
    if (subs.has_entity_id)
	subs_info[*subs_idx]->entity_ids[subs_info[*subs_idx]->entity_idx++] =
	    subs.entity_id;
    if (subs.has_module_class)
	subs_info[*subs_idx]->module_class[subs_info[*subs_idx]->class_idx++] =
	    subs.module_class;
    if (subs.has_i2c_address)
	subs_info[*subs_idx]->i2c_address[subs_info[*subs_idx]->i2c_idx++] =
	    subs.i2c_address;
    return true;
}

bool master_encode_MDR_callback(pb_ostream_t *ostream, const pb_field_t *field, void * const *arg)
{
    if (!pb_encode_tag_for_field(ostream, field)) {
	return false;
    }
    return true;
}