Home > Enterprise >  pass variables between poolWorker s
pass variables between poolWorker s

Time:05-25

I have a pool of 2 processes. Process #1 with infinite loop. I need to stop infinite loop in process #1 when something happens in process #2. How to pass info from process #1 to process #2?

def do_smth(value):
  a = 0
  if value == "1":
    while 1:
      time.sleep(0.5)
      print("process_1", a)
      if a == 10: break
  if value == "2":
    while a < 10:
      time.sleep(0.5)
      print("process_2", a)
      a  =1

def make_a_pool(all):
  with multiprocessing.Pool(processes=2) as pool:
      pool.map(do_smth, all)

if __name__ == "__main__":
    all = ["1", "2"]
    make_a_pool(all)

CodePudding user response:

If what you wish is to share a full variable, and not just a stop condition for an infinite loop, you may use multiprocessing.Value(). Keep in mind you have to initialize the value differently, as multiprocessing.Pool cannot pass around synchronization primitives that can't be pickled:

import functools
import multiprocessing
import time

def initialize_a(a_):
    global a
    a = a_

def do_smth(value):
  if value == "1":
    while True:
      time.sleep(0.5)
      print("process_1", a.value)
      if a.value >= 10: break
  if value == "2":
    while a.value < 10:
      time.sleep(0.5)
      print("process_2", a.value)
      a.value  =1

def make_a_pool(all):
  a = multiprocessing.Value("i")
  a.value = 0
  with multiprocessing.Pool(processes=2,
                            initializer=initialize_a, initargs=(a,)) as pool:
    pool.map(do_smth, all)

if __name__ == "__main__":
    all = ["1", "2"]
    make_a_pool(all)

Output:

process_2 0
process_1 0
process_1 1
process_2 1
process_2 2
process_1 2
process_1 3
process_2 3
process_1 4
process_2 4
process_2 5
process_1 5
process_1 6
process_2 6
process_1 7
process_2 7
process_1 8
process_2 8
process_2 9
process_1 9

I do not need to use any lock, as only one process changes the value, otherwise, you need to use Value.lock().

CodePudding user response:

Simplest way is to use an Event. Keep in mind you have to initialize the event differently, as multiprocessing.Pool cannot pass around synchronization primitives that can't be pickled:

import multiprocessing
import time

def initialize_event(e):
    global event
    event = e

def do_smth(value):
  a = 0
  if value == "1":
    while not event.is_set():
      time.sleep(0.5)
      print("process_1", a)
      if a == 10: break
  if value == "2":
    while a < 10:
      time.sleep(0.5)
      print("process_2", a)
      a  =1
      if a == 5: event.set()

def make_a_pool(all):
  event = multiprocessing.Event()
  with multiprocessing.Pool(processes=2,
                            initializer=initialize_event, initargs=(event,)
                            ) as pool:
    pool.map(do_smth, all)

if __name__ == "__main__":
    all = ["1", "2"]
    make_a_pool(all)

Output (you didn't advance a on value=="1"):

process_2 0
process_1 0
process_1 0
process_2 1
process_1 0
process_2 2
process_2 3
process_1 0
process_2 4
process_1 0
process_2 5
process_2 6
process_2 7
process_2 8
process_2 9
  • Related