diff options
Diffstat (limited to 'tests/multi_bluetooth/ble_gatt_data_transfer.py')
| -rw-r--r-- | tests/multi_bluetooth/ble_gatt_data_transfer.py | 62 |
1 files changed, 22 insertions, 40 deletions
diff --git a/tests/multi_bluetooth/ble_gatt_data_transfer.py b/tests/multi_bluetooth/ble_gatt_data_transfer.py index 944c9e2d2..e8249b3da 100644 --- a/tests/multi_bluetooth/ble_gatt_data_transfer.py +++ b/tests/multi_bluetooth/ble_gatt_data_transfer.py @@ -29,35 +29,33 @@ CHAR_TX = (CHAR_TX_UUID, bluetooth.FLAG_NOTIFY) SERVICE = (SERVICE_UUID, (CHAR_CTRL, CHAR_RX, CHAR_TX)) SERVICES = (SERVICE,) -waiting_event = None -waiting_data = None -ctrl_value_handle = 0 -rx_value_handle = 0 -tx_value_handle = 0 +waiting_events = {} def irq(event, data): - global waiting_event, waiting_data, ctrl_value_handle, rx_value_handle, tx_value_handle if event == _IRQ_CENTRAL_CONNECT: print("_IRQ_CENTRAL_CONNECT") + waiting_events[event] = data[0] elif event == _IRQ_CENTRAL_DISCONNECT: print("_IRQ_CENTRAL_DISCONNECT") elif event == _IRQ_GATTS_WRITE: print("_IRQ_GATTS_WRITE") + waiting_events[(event, data[1])] = None elif event == _IRQ_PERIPHERAL_CONNECT: print("_IRQ_PERIPHERAL_CONNECT") + waiting_events[event] = data[0] elif event == _IRQ_PERIPHERAL_DISCONNECT: print("_IRQ_PERIPHERAL_DISCONNECT") elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: if data[-1] == CHAR_CTRL_UUID: print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1]) - ctrl_value_handle = data[2] + waiting_events[(event, CHAR_CTRL_UUID)] = data[2] elif data[-1] == CHAR_RX_UUID: print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1]) - rx_value_handle = data[2] + waiting_events[(event, CHAR_RX_UUID)] = data[2] elif data[-1] == CHAR_TX_UUID: print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1]) - tx_value_handle = data[2] + waiting_events[(event, CHAR_TX_UUID)] = data[2] elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: print("_IRQ_GATTC_CHARACTERISTIC_DONE") elif event == _IRQ_GATTC_READ_RESULT: @@ -68,26 +66,21 @@ def irq(event, data): print("_IRQ_GATTC_WRITE_DONE", data[-1]) elif event == _IRQ_GATTC_NOTIFY: print("_IRQ_GATTC_NOTIFY", bytes(data[-1])) + waiting_events[(event, data[1])] = None - if waiting_event is not None: - if (isinstance(waiting_event, int) and event == waiting_event) or ( - not isinstance(waiting_event, int) and waiting_event(event, data) - ): - waiting_event = None - waiting_data = data + if event not in waiting_events: + waiting_events[event] = None def wait_for_event(event, timeout_ms): - global waiting_event, waiting_data - waiting_event = event - waiting_data = None - t0 = time.ticks_ms() while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms: - if waiting_data: - return True + if event in waiting_events: + result = waiting_events[event] + del waiting_events[event] + return result machine.idle() - return False + raise ValueError("Timeout waiting for {}".format(event)) # Acting in peripheral role. @@ -103,15 +96,10 @@ def instance0(): multitest.next() try: # Wait for central to connect to us. - if not wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS): - return - conn_handle, _, _ = waiting_data + conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS) # Wait for the central to signal that it's done with its part of the test. - wait_for_event( - lambda event, data: event == _IRQ_GATTS_WRITE and data[1] == char_ctrl_handle, - 2 * TIMEOUT_MS, - ) + wait_for_event((_IRQ_GATTS_WRITE, char_ctrl_handle), 2 * TIMEOUT_MS) # Read all accumulated data from the central. print("gatts_read:", ble.gatts_read(char_rx_handle)) @@ -138,17 +126,14 @@ def instance1(): # Connect to peripheral and then disconnect. print("gap_connect") ble.gap_connect(*BDADDR) - if not wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS): - return - conn_handle, _, _ = waiting_data + conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS) # Discover characteristics. ble.gattc_discover_characteristics(conn_handle, 1, 65535) - wait_for_event( - lambda event, data: ctrl_value_handle and rx_value_handle and tx_value_handle, - TIMEOUT_MS, - ) wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS) + ctrl_value_handle = waiting_events[(_IRQ_GATTC_CHARACTERISTIC_RESULT, CHAR_CTRL_UUID)] + rx_value_handle = waiting_events[(_IRQ_GATTC_CHARACTERISTIC_RESULT, CHAR_RX_UUID)] + tx_value_handle = waiting_events[(_IRQ_GATTC_CHARACTERISTIC_RESULT, CHAR_TX_UUID)] # Write to the characteristic a few times, with and without response. for i in range(4): @@ -163,10 +148,7 @@ def instance1(): wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS) # Wait for notification that peripheral is done with its part of the test. - wait_for_event( - lambda event, data: event == _IRQ_GATTC_NOTIFY and data[1] == ctrl_value_handle, - TIMEOUT_MS, - ) + wait_for_event((_IRQ_GATTC_NOTIFY, ctrl_value_handle), TIMEOUT_MS) # Disconnect from peripheral. print("gap_disconnect:", ble.gap_disconnect(conn_handle)) |
