I am here to present a problem on which I have been struggling for a while. Python 3.8.8 - Using Anaconda - Using Spyder.
I want to interface my Arduino Nano BLE 33 to PC through bleak library using Python. This needs to include some knowledge in AsyncIO library.
BUFFER_LENGHT = 13
PACKET_NUMBER = BUFFER_LENGHT*2
address = "04:56:14:27:55:E8"
MODEL_NBR_UUID = "0000101a-0000-1000-8000-00805f9b34fb"
def process_data(dati):
data = np.array(struct.unpack('H'*BUFFER_LENGHT,dati))
print('_____________DATA_____________')
print(data)
print('_____________END______________')
def shutdown():
client.disconnect()
print('_____________INTERRUPTED_____________')
async def main(address,loop):
global start_timestamp, stream_queue,client
client = BleakClient(address)
while await client.is_connected()==False:
try:
await client.connect()
except Exception as e:
print(e)
try:
time.sleep(1)
start_timestamp=datetime.timestamp(datetime.now())
while True:
dati = await client.read_gatt_char(MODEL_NBR_UUID)
process_data(dati)
time.sleep(2)
except Exception as e:
print(e)
else:
await client.stop_notify(MODEL_NBR_UUID)
await client.disconnect()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main(address, loop))
loop.close()
except KeyboardInterrupt:
print("Process interrupted")
loop.run_until_complete(shutdown())
loop.close()
if __name__ == "__main__":
main()
So, later than have importing all libraris, I want to read some data from Arduino using client.read_gatt_char(MODEL_NBR_UUID) and then unpack it. My problem is that I am not able to stop the AsyncIO cycle. I expected to stop the code using ctrl C, but it does not work and I was not able to understand why.
Are there any mistakes in try-catch structure? Or maybe (more realistic) are there some unprecisions using asyncio functions?
Could anyone help me? It will be really appreciated.
CodePudding user response:
Ok, it was a Spyder-thing after all. I copied your code and after some fixing and taking out the BleakClient
stuff it worked, so I decided to download Spyder.
Spyder apparently is based on IPython and has some peculiarities that can be unfamiliar when coming from plain Python. I'm not sure how you got to run that code in Spyder in the first place, because run_until_complete
raises a RuntimeError
when it's called in a cell in Spyder.
But with some modifications you can run your script in Spyder:
import asyncio
from datetime import datetime
from asyncio.exceptions import CancelledError
BUFFER_LENGHT = 13
PACKET_NUMBER = BUFFER_LENGHT * 2
address = "04:56:14:27:55:E8"
MODEL_NBR_UUID = "0000101a-0000-1000-8000-00805f9b34fb"
def process_data(dati):
data = dati
print("_____________DATA_____________")
print(data)
print("_____________END______________")
async def shutdown():
# client.disconnect()
print("_____________INTERRUPTED_____________")
async def main(address):
global start_timestamp, stream_queue, client
# client = BleakClient(address)
try:
#await client.connect()
print("connect")
except Exception as e:
print(e)
try:
start_timestamp = datetime.timestamp(datetime.now())
while True:
dati = "some data"
process_data(dati)
await asyncio.sleep(2)
except CancelledError: #KeyboardInterrupt is raised as CancelledError
print("cancelled")
finally: # your else-block was unreachable code, use finally for finalizing stuff
print("stop notify") # your clean-up code here
print("disconnect")
async def start(): #new main entry point
await main(address)
This should work out of the box for you to play with. Just run the file with the "play" button and now you can just run your functions from the IPython interface.
[2] await start()
will start your script. Now you should be able to stop with the "stop" button or ctrl c inside the running cell.
This is far from perfect but I guess it should give you a starting point to reinsert your BleakClient
logic.