Trying to understand concurrency based on generators, I try to follow the talk by D. Beazley. I don't understand the purpose of future_monitor
and was wondering what are the consequence of taking the function out? Here is its implementation of asynchronous server and, right after, my implementation without the future_monitor
function.
Perhaps I misunderstand how future and add_done_callback
act with concurrent library.
I do not know:
- what is running in the main process and what is delegated to an other process.
- How
add_done_callback
interfere with the main process, will it appear in the middle of the main thread whenever the future is done?
As I understand:
- the function submitted to the pool is executed in other process that the main process,
- and the return of other process is registered in future object when returning (I imagine a kind of message queue between two different process where future will get the result passes by the tierce process executing
fib(n)
) add_done_callback
is a not blocking function that will suspend the main thread when future is done and call callback immediately (suspending the main process?)
# server.py
# Fib microservice
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) fib(n-2)
from socket import *
from collections import deque
from select import select
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import ProcessPoolExecutor as Pool
import os
import psutil
def future_done(future):
import pdb;pdb.set_trace()
tasks.append(future_wait.pop(future))
future_notify.send(b'x')
def future_monitor():
while True:
yield 'recv', future_event
future_event.recv(100)
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# No active tasks to run
# wait for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
# Tasks left in tasks_queue
task = tasks.popleft()
try:
why, what = next(task) # Run to the yield
if why == 'recv':
# Must go wait somewhere
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
elif why == 'future':
future_wait[what] = task
import pdb;pdb.set_trace()
what.add_done_callback(future_done)
else:
raise RuntimeError("ARG!")
except StopIteration:
print("task done")
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept() # blocking
print("Connection", addr,client)
tasks.append(fib_handler(client))
def fib_handler(client):
while True:
yield 'recv', client
req = client.recv(100)# blocking
print(f'Receive {req} from Client {client.getpeername()} ')
if not req:
break
n = int(req)
future = pool.submit(fib, n)
yield 'future', future
result = future.result() # Blocks
resp = str(result).encode('ascii') b'\n'
yield 'send',client
client.send(resp) # blocking
print(f'Send {resp} to Client {client.getpeername()} ')
print("Closed")
if __name__=='__main__':
#Parameters
pool = Pool(4)
recv_wait = { } # Mapping sockets -> tasks (generators)
send_wait = { }
future_wait = { }
future_notify, future_event = socketpair()
tasks = deque()
#Main tasks
tasks.append(future_monitor())
tasks.append(fib_server(('',25000)))
#import pdb;pdb.set_trace()
#Run Event loop
run()
# server.py
# Fib microservice
from socket import *
from fib import fib
from collections import deque
from select import select
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import ProcessPoolExecutor as Pool
def future_done(future):
import pdb;pdb.set_trace()
tasks.append(future_wait.pop(future))
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# No active tasks to run
# wait for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
# Tasks left in tasks_queue
task = tasks.popleft()
try:
why, what = next(task) # Run to the yield
if why == 'recv':
# Must go wait somewhere
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
elif why == 'future':
future_wait[what] = task
import pdb;pdb.set_trace()
what.add_done_callback(future_done)
else:
raise RuntimeError("ARG!")
except StopIteration:
print("task done")
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept() # blocking
print("Connection", addr,client)
tasks.append(fib_handler(client))
def fib_handler(client):
while True:
yield 'recv', client
req = client.recv(100)# blocking
print(f'Receive {req} from Client {client.getpeername()} ')
if not req:
break
n = int(req)
future = pool.submit(fib, n)
yield 'future', future
result = future.result() # Blocks
resp = str(result).encode('ascii') b'\n'
yield 'send',client
client.send(resp) # blocking
print(f'Send {resp} to Client {client.getpeername()} ')
print("Closed")
if __name__=='__main__':
#Parameters
pool = Pool(4)
recv_wait = { } # Mapping sockets -> tasks (generators)
send_wait = { }
future_wait = { }
tasks = deque()
#Main tasks
tasks.append(fib_server(('',25000)))
#import pdb;pdb.set_trace()
#Run Event loop
run()
CodePudding user response:
I have perhaps some elements of the answer .
To be clear we ll speak about 2 process:
- Main process
- Fib process The Main Process has 2 thread:
- main thread
- callback thread (which treat the futur object)
add_done_callback is a not blocking function (run in other concurrent thread but in same process, seed below), so the time that fib(n) execute, the run function is progressing till select statement where run() function will be stuck in the "while not tasks" loop waiting for a polling/select event!
As i understand future.add_done_callback() is just an other thread than the main thread, but stay in the same process (communication is easy)! We have to be careful not mixing everything : some job -> fib(n) is submmitted in other process, but the callback will be called in the main process in an other thread (i suppose as already mentionned that the return of "fib process" is communicated by a kind of "process Queue" to the futur object in the main process).
So even when run() is stuck in the select statement, the callback will be executed concurrently and the "fib_handler" task will be added!
When the callback thread (in main process) return, it only left the main thread, always stuck in the select statement. It is waiting for a recv,send event. So if another client connect it will release the select statement and going out of the "while not tasks" loop (since the futur_done callback added a new task).