Home > Back-end >  How can I use asyncio.Condition within a Task in Python < v3.10
How can I use asyncio.Condition within a Task in Python < v3.10

Time:05-12

Any reason why I can't use a asyncio.Condition within a Task?

c = asyncio.Condition()

async def a():
    print("A ..")
    # await asyncio.sleep(0.2) # This works
    async with c:
        # RuntimeError: Task <Task pending coro=<a() running at ..this file..:13>> got Future <Future pending> attached to a different loop
        await c.wait()   #

async def main():
    asyncio.get_event_loop().create_task(a())
    await asyncio.sleep(2)

Says: "Got Future attached to a different loop"

I don't think I created a new loop.

Full example here:

import asyncio

c = asyncio.Condition()

async def a():
    print("A ..")
    # await asyncio.sleep(0.2) # This works
    async with c:
        # RuntimeError: Task <Task pending coro=<a() running at ..this file..:13>> got Future <Future pending> attached to a different loop
        await c.wait()   #
    print("A done")

async def b():
    await asyncio.sleep(2)
    print("B ..")
    async with c:
       c.notify_all() 
    print("B done")
    await asyncio.sleep(1)

async def main():
    asyncio.get_event_loop().create_task(a())
    await b()

asyncio.run(main())

I see the same error using, Python 3.7. 3.8, 3.9.

CodePudding user response:

The docs for asyncio.Condition.notify_all() states:

The lock must be acquired before this method is called and released shortly after. If called with an unlocked lock a RuntimeError error is raised.

The lock gets released in a on calling c.wait(), therefore the Lock inside c is unlocked when you call c.notify_all().

You need to hold the lock before calling notify_all(). Using

 async with c:
     c.notify_all()

makes your example work as expected.

CodePudding user response:

As @JanWilamowski points out, this seems to be a bug with Python < v3.10. Works fine in 3.10.

Booo!

It also seems that you can't work around this bug by switching from Condition to Queue (see below) - I guess Queue uses Condition internally. Simlarly ... this also failed:

async def main():
    loop.create_task(a())
    ...

loop = asyncio.new_event_loop()
loop.run_until_complete(main())
# asyncio.run(main())

However, for reasons unclear, this does work:

async def main():
    loop.create_task(a())
    ...

loop = asyncio.get_event_loop()   // @@@@
loop.run_until_complete(main())

Full working example below:

import asyncio
import time

USE_QUEUE = False

if not USE_QUEUE:
    c = asyncio.Condition()
else:
    q = asyncio.Queue()

async def a():
    print("A ..")
    # await asyncio.sleep(0.2) # This works
    if not USE_QUEUE:
        async with c:
           await c.wait()
    else:
        result = await q.get()
        q.task_done()
        print("result", result)
    print("A done", time.time())

async def b():
    await asyncio.sleep(1)
    print("B ..", time.time())
    if not USE_QUEUE:
        async with c:
            c.notify_all()
    else:
        result = await q.put(123)
    await asyncio.sleep(1)
    print("B done")

async def main():
    loop.create_task(a())
    # asyncio.get_event_loop().create_task(a())
    await b()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
  • Related