import trio
work_available = trio.Event()
async def get_work():
while True:
work = check_for_work()
if not work:
await work_available.wait()
else:
return work
def add_work_to_pile(...):
...
if work_available.statistics().tasks_waiting:
global work_available
work_available.set()
work_available = trio.Event()
In this Python-like code example I get work in bursts via add_work_to_pile()
. The workers which get work via get_work()
are slow. So most of the time add_work_to_pile()
is called there will be no one waiting on work_available
.
Which is better/cleaner/simpler/more pythonic/more trionic/more intended by the trio developers?
- checking if someone is looking for the
Event()
viastatistics().tasks_waiting
, like in the example code, ...or... - unconditionally
set()
setting theEvent()
and creating a new one each time? (Most of them in vain.)
Furthermore... the API does not really seem to expect regular code to check if someone is waiting via this statistics()
call...
I don’t mind spending a couple more lines to make things clearer. But that goes both ways: a couple CPU cycles more are fine for simpler code...
CodePudding user response:
I think you might want a trio.ParkingLot
. It gives more control over parking (i.e. which is like Event.wait()
) and unparking (which is like Event.set()
except that it doesn't stop future parkers from waiting). But it doesn't have any notion of being set at all so you would need to store that information separately. If you work is naturally Truety when set (e.g. a non-empty list) then that might be easy anyway. Example:
available_work = []
available_work_pl = trio.ParkingLot()
async def get_work():
while not available_work:
await available_work_pl.park()
result = list(available_work)
available_work.clear()
return result
def add_work_to_pile():
available_work.append(foo)
available_work_pl.unpark()
Edit: Replaced "if" with "while" in get_work()
. I think if
has a race condition: if there are two parked tasks and then add_work_to_pile()
gets called twice, then one get_work()
would get both work items but the other would still be unparked and return an empty list. Using while
instead will make it loop back around until more data is added.
CodePudding user response:
Creating a new Event
is roughly the same cost as creating the _EventStatistics
object within the statistics
method. You'll need to profile your own code to pick out any small difference in performance. However, although it is safe and performant, the intent of statistics
across trio's classes is for debug rather than core logic. Using/discarding many Event
instances would be relatively more along the intent of the devs.
A more trionic pattern would be to load each work item into a buffered memory channel in place of your add_work_to_pile()
method and then iterate on that in the task that awaits get_work
. I feel the amount of code is comparable to your example:
import trio
send_chan, recv_chan = trio.open_memory_channel(float('inf'))
async def task_that_uses_work_items():
# # compare
# while True:
# work = await get_work()
# handle_work(work)
async for work in recv_chan:
handle_work(work)
def add_work_to_pile():
...
for work in new_work_set:
send_chan.send_nowait(work)
# maybe your work is coming in from a thread?
def add_work_from_thread():
...
for work in new_work_set:
trio_token.run_sync_soon(send_chan.send_nowait, work)
Furthermore, it's performant because the work items are efficiently rotated through a deque
internally. This code would checkpoint for every work item so you may have to jump through some hoops if you want to avoid that.