Home > Software design >  Best way to avoid warning about un-run coroutines that are not-yet run by cancelled tasks?
Best way to avoid warning about un-run coroutines that are not-yet run by cancelled tasks?

Time:08-05

In the following, the coroutinerunIt() is created and provided as a parameter to delegate(...) - which is turned into a Task that is canceled before runIt executes:

import asyncio

async def cancelTaskTest():
    async def runIt():
        print("RunIt ran")

    async def delegate(coro):
        await coro

    task = asyncio.create_task(delegate(runIt()))
    task.cancel()

if __name__=='__main__':
    asyncio.run(cancelTaskTest())

Produces the unwanted warning:

/usr/lib/python3.10/asyncio/base_events.py:1881: RuntimeWarning: coroutine 'cancelTaskTest.<locals>.runIt' was never awaited
  handle = self._ready.popleft()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I'm aware that runIt did not run. I don't want a warning about it - what's the best way to avoid this.

CodePudding user response:

Simplest method would be to remove () in runIt and call it in runAfterTimeout():

import asyncio


async def cancelTaskTest():
    async def runIt():
        print("RunIt ran")

    async def delegate(asyncFunc):
        coro = asyncFunc() # <-- put () here
        await coro  

    task = asyncio.create_task(delegate(runIt))  # <-- removed () in runIt
    task.cancel()


if __name__ == "__main__":
    asyncio.run(cancelTaskTest())

EDIT: To add parameters to RunIt, just create plain lambda::

import asyncio


async def cancelTaskTest():
    async def runIt(p1, p2):
        print(f"RunIt({p1}, {p2})")

    async def delegate(coro):
        await coro()

    task = asyncio.create_task(delegate(lambda: runIt(1, 2)))
    task.cancel()


if __name__ == "__main__":
    asyncio.run(cancelTaskTest())
  • Related