diff options
Diffstat (limited to 'tests/multi_bluetooth/ble_mtu.py')
| -rw-r--r-- | tests/multi_bluetooth/ble_mtu.py | 54 |
1 files changed, 21 insertions, 33 deletions
diff --git a/tests/multi_bluetooth/ble_mtu.py b/tests/multi_bluetooth/ble_mtu.py index ef3199c5b..73c77e246 100644 --- a/tests/multi_bluetooth/ble_mtu.py +++ b/tests/multi_bluetooth/ble_mtu.py @@ -51,7 +51,6 @@ waiting_events = {} def irq(event, data): - global value_handle if event == _IRQ_CENTRAL_CONNECT: print("_IRQ_CENTRAL_CONNECT") waiting_events[event] = data[0] @@ -68,6 +67,8 @@ def irq(event, data): if data[-1] == CHAR_UUID: print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1]) waiting_events[event] = data[2] + else: + return elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: print("_IRQ_GATTC_CHARACTERISTIC_DONE") elif event == _IRQ_GATTC_WRITE_DONE: @@ -86,9 +87,11 @@ def wait_for_event(event, timeout_ms): t0 = time.ticks_ms() while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms: if event in waiting_events: - return True + result = waiting_events[event] + del waiting_events[event] + return result machine.idle() - return False + raise ValueError("Timeout waiting for {}".format(event)) # Acting in peripheral role. @@ -101,8 +104,6 @@ def instance0(): multitest.next() try: for i in range(7): - waiting_events.clear() - if i == 1: ble.config(mtu=200) elif i == 2: @@ -116,31 +117,26 @@ def instance0(): ble.config(mtu=256) # Wait for central to connect to us. - if not wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS): - return - conn_handle = waiting_events[_IRQ_CENTRAL_CONNECT] + conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS) if i >= 4: print("gattc_exchange_mtu") ble.gattc_exchange_mtu(conn_handle) - if not wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS): - return - mtu = waiting_events[_IRQ_MTU_EXCHANGED] + mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS) print("gatts_notify") ble.gatts_notify(conn_handle, char_handle, str(i) * 64) - if not wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS): - return + # Extra timeout while client does service discovery. + wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS * 2) print("gatts_read") data = ble.gatts_read(char_handle) print("characteristic len:", len(data), chr(data[0])) # Wait for the central to disconnect. - if not wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS): - return + wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS) print("gap_advertise") ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY") @@ -154,8 +150,6 @@ def instance1(): multitest.next() try: for i in range(7): - waiting_events.clear() - if i < 4: ble.config(mtu=300) elif i == 5: @@ -166,39 +160,33 @@ def instance1(): ble.config(mtu=256) # Connect to peripheral and then disconnect. + # Extra scan timeout allows for the peripheral to receive the previous disconnect + # event and start advertising again. print("gap_connect") - ble.gap_connect(*BDADDR) - if not wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS): - return - conn_handle = waiting_events[_IRQ_PERIPHERAL_CONNECT] + ble.gap_connect(BDADDR[0], BDADDR[1], 5000) + conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS) if i < 4: print("gattc_exchange_mtu") ble.gattc_exchange_mtu(conn_handle) - if not wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS): - return - mtu = waiting_events[_IRQ_MTU_EXCHANGED] + mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS) - if not wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS): - return + wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS) print("gattc_discover_characteristics") ble.gattc_discover_characteristics(conn_handle, 1, 65535) - if not wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS): - return - value_handle = waiting_events[_IRQ_GATTC_CHARACTERISTIC_RESULT] + value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS) + wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS) # Write 20 more than the MTU to test truncation. print("gattc_write") ble.gattc_write(conn_handle, value_handle, chr(ord("a") + i) * (mtu + 20), 1) - if not wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS): - return + wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS) # Disconnect from peripheral. print("gap_disconnect:", ble.gap_disconnect(conn_handle)) - if not wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS): - return + wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS) finally: ble.active(0) |
