diff options
Diffstat (limited to 'tests/multi_bluetooth/ble_gap_connect.py')
| -rw-r--r-- | tests/multi_bluetooth/ble_gap_connect.py | 70 |
1 files changed, 23 insertions, 47 deletions
diff --git a/tests/multi_bluetooth/ble_gap_connect.py b/tests/multi_bluetooth/ble_gap_connect.py index 8b40a2916..2c1d2cbbc 100644 --- a/tests/multi_bluetooth/ble_gap_connect.py +++ b/tests/multi_bluetooth/ble_gap_connect.py @@ -10,101 +10,77 @@ _IRQ_CENTRAL_DISCONNECT = const(2) _IRQ_PERIPHERAL_CONNECT = const(7) _IRQ_PERIPHERAL_DISCONNECT = const(8) -central_connected = False -central_disconnected = False -peripheral_connected = False -peripheral_disconnected = False -conn_handle = None +waiting_events = {} def irq(event, data): - global central_connected, central_disconnected, peripheral_connected, peripheral_disconnected, conn_handle if event == _IRQ_CENTRAL_CONNECT: print("_IRQ_CENTRAL_CONNECT") - central_connected = True - conn_handle = data[0] + waiting_events[event] = data[0] elif event == _IRQ_CENTRAL_DISCONNECT: print("_IRQ_CENTRAL_DISCONNECT") - central_disconnected = True elif event == _IRQ_PERIPHERAL_CONNECT: print("_IRQ_PERIPHERAL_CONNECT") - peripheral_connected = True - conn_handle = data[0] + waiting_events[event] = data[0] elif event == _IRQ_PERIPHERAL_DISCONNECT: print("_IRQ_PERIPHERAL_DISCONNECT") - peripheral_disconnected = True - remote_addr = data[0] + if event not in waiting_events: + waiting_events[event] = None -def wait_for(fn, timeout_ms): + +def wait_for_event(event, timeout_ms): t0 = time.ticks_ms() while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms: - if fn(): - return True + if event in waiting_events: + result = waiting_events[event] + del waiting_events[event] + return result machine.idle() - return False + raise ValueError("Timeout waiting for {}".format(event)) # Acting in peripheral role. def instance0(): - global central_connected, central_disconnected - multitest.globals(BDADDR=ble.config("mac")) print("gap_advertise") ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY") multitest.next() try: # Wait for central to connect, then wait for it to disconnect. - if not wait_for(lambda: central_connected, TIMEOUT_MS): - return - if not wait_for(lambda: central_disconnected, TIMEOUT_MS): - return - - central_connected = False - central_disconnected = False + wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS) + wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS) # Start advertising again. print("gap_advertise") ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY") # Wait for central to connect, then disconnect it. - if not wait_for(lambda: central_connected, TIMEOUT_MS): - return + conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS) print("gap_disconnect:", ble.gap_disconnect(conn_handle)) - if not wait_for(lambda: central_disconnected, TIMEOUT_MS): - return + wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS) finally: ble.active(0) # Acting in central role. def instance1(): - global peripheral_connected, peripheral_disconnected - multitest.next() try: # Connect to peripheral and then disconnect. print("gap_connect") ble.gap_connect(*BDADDR) - if not wait_for(lambda: peripheral_connected, TIMEOUT_MS): - return + conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS) print("gap_disconnect:", ble.gap_disconnect(conn_handle)) - if not wait_for(lambda: peripheral_disconnected, TIMEOUT_MS): - return - - peripheral_connected = False - peripheral_disconnected = False - - # Wait for peripheral to start advertising again. - time.sleep_ms(100) + wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS) # Connect to peripheral and then let the peripheral disconnect us. + # Extra scan timeout allows for the peripheral to receive the disconnect + # event and start advertising again. print("gap_connect") - ble.gap_connect(*BDADDR) - if not wait_for(lambda: peripheral_connected, TIMEOUT_MS): - return - if not wait_for(lambda: peripheral_disconnected, TIMEOUT_MS): - return + ble.gap_connect(BDADDR[0], BDADDR[1], 5000) + wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS) + wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS) finally: ble.active(0) |
