My project has a sensor with a Python driver which collects readings and generates a numpy array, which needs to have some statistical processing done and return one or more processed arrays. Both the raw data and processed data are saved to files. To increase data throughput, we want to process a set of data and have the sensor driver collect the next set concurrently.
The driver is comprised of two classes with one class SensorInterface
that acts as a main interface to the sensor and the other DataCaptureThread
with its main method running in a thread which constantly reads the socket for data. In the high level module, a call to a SensorInterface
method initiates the DataCaptureThread
to record data into the array and save it to a file after a preset duration has elapsed.
My contribution has been to develop the data processor, consisting of a single class DataHandler
which should receive the numpy array after it is generated and perform its routines. After the array is generated, it is no longer important to the driver.
A very abbreviated version of the code is as such (classes are actually contained in different modules):
class DataCaptureThread():
def __init__(self,duration,parameters):
self.duration = 0
self.parameters = parameters
self.capture_flag = False
self.outgoing_data_open_flag = True
self.thread = Threading.thread(target=self.read_data, args=(duration,))
def read_data(self, duration):
if self.capture_flag:
while True:
#Pretend this function gets entire dataset from socket and saves file
self.data_array = ReadSocket()
#Wait for other process to indicate it's ready to receive array
while not self.outgoing_data_open_flag:
continue
class SensorInterface():
def __init__(self,parameters):
self.parameters = parameters
def data_stream_start(self):
#instantiate object of capture thread in this class
self.capture_stream = DataCaptureThread()
def collect_data(duration):
self.capture_stream.duration = duration
self.capture_stream.capture_flag = True
class DataHandler():
def __init__(self):
#flag to indicate that sensor process is not ready to transfer data
self.incoming_data_ready_flag = False
#Flag to indicate that process is currently busy with data, cannot accept more
self.data_full = False
self.data_array = np.array([])
def run_processing(self):
while True:
#Wait until data is ready
while not self.incoming_data_ready_flag:
continue
#set flag to indicate process is busy with data
self.data_full = True
#Pretend this function encapsulates all data processing and saving
DataProcessing(self.data_array)
#Reset flag
self.data_full = False
if __name__ == '__main__':
import multiprocessing as mp
#Create objects
sensor = SensorInterface(params1)
data_processor = DataHandler(params2)
duration = 3.0
#Creating processes for data analysis
#collect_proc = mp.Process(target=sensor.run_processing, args=(duration,)
data_proc = mp.Process(target=data_processor, args=())
#start and join processes
data_proc.start()
data_proc.join()
This is almost psuedocode, and I may have neglected to initialize variables or made slight mistakes.
The intention behind the flags outgoing_data_ready_flag
,data_full
, and incoming_data_ready_flag
is to block the transfer of the array from one process to the other before the receiver is ready.
My question is, how do I communicate the flags and array between the processes? Python's multiprocessing module has been opaque to me, and I have been unable to figure out a good way of communicating these data since both processes are class methods and the flags/data are class attributes.
Note that I would like to avoid excessive change to the structure of the driver classes; they are very complex and time consuming. I have more direct control over the data processing class and the main function.
CodePudding user response:
Sharing the flags
You can use multiprocessing.Value to store the flags in shared memory. These would need to be created in the main process and shared to the child processes which store them. Furthermore, to access and change their value, you need to use the value
parameter of these flags rather than comparing them directly with different objects.
Keep in mind that shared memory is not thread-safe, but multiprocessing.Value
supports keyword argument lock=True
during creation to enable use of locks internally. Sample change in your code:
class DataCaptureThread():
def __init__(self, capture_flag, outgoing_flag, duration,parameters):
.
.
self.capture_flag = capture_flag
self.outgoing_data_open_flag = outgoing_flag
.
def read_data(self, duration):
if self.capture_flag.value:
while True:
#Pretend this function gets entire dataset from socket and saves file
self.data_array = ReadSocket()
#Wait for other process to indicate it's ready to receive array
while not self.outgoing_data_open_flag.value:
continue
class SensorInterface():
.
.
.
def collect_data(duration):
self.capture_stream.duration = duration
self.capture_stream.capture_flag.value = True
class DataHandler():
def __init__(self, incoming_data_ready_flag, data_full):
#flag to indicate that sensor process is not ready to transfer data
self.incoming_data_ready_flag = incoming_data_ready_flag
#Flag to indicate that process is currently busy with data, cannot accept more
self.data_full = data_full
self.data_full.value = False
.
def run_processing(self):
while True:
#Wait until data is ready
while not self.incoming_data_ready_flag.value:
continue
#set flag to indicate process is busy with data
self.data_full.value = True
#Pretend this function encapsulates all data processing and saving
DataProcessing(self.data_array)
#Reset flag
self.data_full.value = False
if __name__ == '__main__':
import multiprocessing as mp
from ctypes import c_bool
# Create flags and set their initial value
outgoing_data_ready_flag = Value(c_bool, False)
data_full = Value(c_bool, False)
incoming_data_ready_flag = Value(c_bool, False)
#Create objects (remember to pass these flags to them as arguments!)
.
.
.
Sharing the array
As per communicating the content of the actual arrays, it is hard to suggest the best way since you have abstracted how you would get the data in your pseudocode. Normally, a Queue would be a good way to go about this, but you can also use a Pipe if there is going to be only one consumer and one producer writing data from one end. Pipes are also not thread-safe (unlike queues) but are generally faster than Queues. If you do decide to use them, they would also need to be passed to the child processes from the main process
Additionally, there are also shared memory blocks that you can create and store the array directly in using multiprocessing.shared_memory. Any child process could attach to this memory block and access the contents, making it much faster than queues and pipes with the tradeoff being that they are a little complicated to deal with, which I think you are trying to avoid.
Edit
Regarding the method to share arrays, its much more about how much flexibility you are ready to compromise for speed. For example, here are some questions you should ask yourself:
- How many different processes would need the contents of the array once its populated? If its more than one, then using Queues/Pipes would be expensive since you would need to create multiple of them for each process to ensure each one gets the required data.
- What is the content in the arrays? Remember that all data transferred from one process to another (like when using queues/pipes) needs to be pickled. So if the object being put in queue is fairly complex (or big), pickling and then unpickling would add extra overhead. On the flip side, shared memory usually is very restrictive about what it can and can't store (more about this below).
- How much time am I willing to spend on this? Like I said, queues and pipes are very straightforward implementations, you could do it in half an evening and call it a day. They are also very flexible, you can put almost anything picklable in there, making it easier on future you if the requirements for the code changes. Shared memory, on the other hand, comes with it's usual set of headaches and bottlenecks (thread-safety being both of them). If you don't want to go as low level as working with memory blocks, there is also multiprocessing.Array that works similar to
mp.Value
. But again it's restrictive in what it can store (I have never used this, so there may be some workarounds that I am not aware of, take it with a grain of salt).