Any reason why I can't use a asyncio.Condition within a Task?
c = asyncio.Condition()
async def a():
print("A ..")
# await asyncio.sleep(0.2) # This works
async with c:
# RuntimeError: Task <Task pending coro=<a() running at ..this file..:13>> got Future <Future pending> attached to a different loop
await c.wait() #
async def main():
asyncio.get_event_loop().create_task(a())
await asyncio.sleep(2)
Says: "Got Future attached to a different loop"
I don't think I created a new loop.
Full example here:
import asyncio
c = asyncio.Condition()
async def a():
print("A ..")
# await asyncio.sleep(0.2) # This works
async with c:
# RuntimeError: Task <Task pending coro=<a() running at ..this file..:13>> got Future <Future pending> attached to a different loop
await c.wait() #
print("A done")
async def b():
await asyncio.sleep(2)
print("B ..")
async with c:
c.notify_all()
print("B done")
await asyncio.sleep(1)
async def main():
asyncio.get_event_loop().create_task(a())
await b()
asyncio.run(main())
I see the same error using, Python 3.7. 3.8, 3.9.
CodePudding user response:
The docs for asyncio.Condition.notify_all()
states:
The lock must be acquired before this method is called and released shortly after. If called with an unlocked lock a RuntimeError error is raised.
The lock gets released in a
on calling c.wait()
, therefore the Lock
inside c
is unlocked when you call c.notify_all()
.
You need to hold the lock before calling notify_all()
. Using
async with c:
c.notify_all()
makes your example work as expected.
CodePudding user response:
As @JanWilamowski points out, this seems to be a bug with Python < v3.10. Works fine in 3.10.
Booo!
It also seems that you can't work around this bug by switching from Condition to Queue (see below) - I guess Queue uses Condition internally. Simlarly ... this also failed:
async def main():
loop.create_task(a())
...
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
# asyncio.run(main())
However, for reasons unclear, this does work:
async def main():
loop.create_task(a())
...
loop = asyncio.get_event_loop() // @@@@
loop.run_until_complete(main())
Full working example below:
import asyncio
import time
USE_QUEUE = False
if not USE_QUEUE:
c = asyncio.Condition()
else:
q = asyncio.Queue()
async def a():
print("A ..")
# await asyncio.sleep(0.2) # This works
if not USE_QUEUE:
async with c:
await c.wait()
else:
result = await q.get()
q.task_done()
print("result", result)
print("A done", time.time())
async def b():
await asyncio.sleep(1)
print("B ..", time.time())
if not USE_QUEUE:
async with c:
c.notify_all()
else:
result = await q.put(123)
await asyncio.sleep(1)
print("B done")
async def main():
loop.create_task(a())
# asyncio.get_event_loop().create_task(a())
await b()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())