Home > Software engineering >  Sharing numpy array between processes collecting data (populating array) and parsing data (array ope
Sharing numpy array between processes collecting data (populating array) and parsing data (array ope

Time:07-01

My project has a sensor with a Python driver which collects readings and generates a numpy array, which needs to have some statistical processing done and return one or more processed arrays. Both the raw data and processed data are saved to files. To increase data throughput, we want to process a set of data and have the sensor driver collect the next set concurrently.

The driver is comprised of two classes with one class SensorInterface that acts as a main interface to the sensor and the other DataCaptureThread with its main method running in a thread which constantly reads the socket for data. In the high level module, a call to a SensorInterface method initiates the DataCaptureThread to record data into the array and save it to a file after a preset duration has elapsed.

My contribution has been to develop the data processor, consisting of a single class DataHandler which should receive the numpy array after it is generated and perform its routines. After the array is generated, it is no longer important to the driver.

A very abbreviated version of the code is as such (classes are actually contained in different modules):

class DataCaptureThread():
    def __init__(self,duration,parameters):
        self.duration = 0
        self.parameters = parameters
        self.capture_flag = False
        self.outgoing_data_open_flag = True
        self.thread = Threading.thread(target=self.read_data, args=(duration,))
        
    def read_data(self, duration):
        if self.capture_flag:
            while True:              
                #Pretend this function gets entire dataset from socket and saves file
                self.data_array = ReadSocket()
                #Wait for other process to indicate it's ready to receive array
                while not self.outgoing_data_open_flag:
                    continue
                
class SensorInterface():
    def __init__(self,parameters):        
        self.parameters = parameters
        
    def data_stream_start(self):
        #instantiate object of capture thread in this class
        self.capture_stream = DataCaptureThread()
        
    def collect_data(duration):
        
        self.capture_stream.duration = duration
        self.capture_stream.capture_flag = True
                
class DataHandler():
    def __init__(self):
        #flag to indicate that sensor process is not ready to transfer data
        self.incoming_data_ready_flag = False
        #Flag to indicate that process is currently busy with data, cannot accept more
        self.data_full = False
        self.data_array = np.array([])
        
    def run_processing(self):
        while True:
            #Wait until data is ready
            while not self.incoming_data_ready_flag:
                continue
            #set flag to indicate process is busy with data
            self.data_full = True
            #Pretend this function encapsulates all data processing and saving  
            DataProcessing(self.data_array)
            #Reset flag
            self.data_full = False
               
if __name__ == '__main__':
    import multiprocessing as mp
    #Create objects
    sensor = SensorInterface(params1)
    data_processor = DataHandler(params2)
    
    duration = 3.0   
    #Creating processes for data analysis
    #collect_proc = mp.Process(target=sensor.run_processing, args=(duration,) 
    data_proc = mp.Process(target=data_processor, args=())    
    #start and join processes
    data_proc.start()
    data_proc.join()

This is almost psuedocode, and I may have neglected to initialize variables or made slight mistakes.

The intention behind the flags outgoing_data_ready_flag,data_full, and incoming_data_ready_flag is to block the transfer of the array from one process to the other before the receiver is ready.

My question is, how do I communicate the flags and array between the processes? Python's multiprocessing module has been opaque to me, and I have been unable to figure out a good way of communicating these data since both processes are class methods and the flags/data are class attributes.

Note that I would like to avoid excessive change to the structure of the driver classes; they are very complex and time consuming. I have more direct control over the data processing class and the main function.

CodePudding user response:

Sharing the flags

You can use multiprocessing.Value to store the flags in shared memory. These would need to be created in the main process and shared to the child processes which store them. Furthermore, to access and change their value, you need to use the value parameter of these flags rather than comparing them directly with different objects.

Keep in mind that shared memory is not thread-safe, but multiprocessing.Value supports keyword argument lock=True during creation to enable use of locks internally. Sample change in your code:

class DataCaptureThread():
    def __init__(self, capture_flag, outgoing_flag, duration,parameters):
        .
        .
        self.capture_flag = capture_flag
        self.outgoing_data_open_flag = outgoing_flag
        .
        
    def read_data(self, duration):
        if self.capture_flag.value:
            while True:              
                #Pretend this function gets entire dataset from socket and saves file
                self.data_array = ReadSocket()
                #Wait for other process to indicate it's ready to receive array
                while not self.outgoing_data_open_flag.value:
                    continue
                
class SensorInterface():
    .
    .
    .       
    def collect_data(duration):
        
        self.capture_stream.duration = duration
        self.capture_stream.capture_flag.value = True
                
class DataHandler():
    def __init__(self, incoming_data_ready_flag, data_full):
        #flag to indicate that sensor process is not ready to transfer data
        self.incoming_data_ready_flag = incoming_data_ready_flag
        #Flag to indicate that process is currently busy with data, cannot accept more
        self.data_full = data_full
        self.data_full.value = False
        .
        
    def run_processing(self):
        while True:
            #Wait until data is ready
            while not self.incoming_data_ready_flag.value:
                continue
            #set flag to indicate process is busy with data
            self.data_full.value = True
            #Pretend this function encapsulates all data processing and saving  
            DataProcessing(self.data_array)
            #Reset flag
            self.data_full.value = False
               
if __name__ == '__main__':
    import multiprocessing as mp
    from ctypes import c_bool
    
    # Create flags and set their initial value
    outgoing_data_ready_flag = Value(c_bool, False)
    data_full = Value(c_bool, False)
    incoming_data_ready_flag = Value(c_bool, False)
    
    #Create objects (remember to pass these flags to them as arguments!)
    .
    .
    .

Sharing the array

As per communicating the content of the actual arrays, it is hard to suggest the best way since you have abstracted how you would get the data in your pseudocode. Normally, a Queue would be a good way to go about this, but you can also use a Pipe if there is going to be only one consumer and one producer writing data from one end. Pipes are also not thread-safe (unlike queues) but are generally faster than Queues. If you do decide to use them, they would also need to be passed to the child processes from the main process

Additionally, there are also shared memory blocks that you can create and store the array directly in using multiprocessing.shared_memory. Any child process could attach to this memory block and access the contents, making it much faster than queues and pipes with the tradeoff being that they are a little complicated to deal with, which I think you are trying to avoid.

Edit

Regarding the method to share arrays, its much more about how much flexibility you are ready to compromise for speed. For example, here are some questions you should ask yourself:

  1. How many different processes would need the contents of the array once its populated? If its more than one, then using Queues/Pipes would be expensive since you would need to create multiple of them for each process to ensure each one gets the required data.
  2. What is the content in the arrays? Remember that all data transferred from one process to another (like when using queues/pipes) needs to be pickled. So if the object being put in queue is fairly complex (or big), pickling and then unpickling would add extra overhead. On the flip side, shared memory usually is very restrictive about what it can and can't store (more about this below).
  3. How much time am I willing to spend on this? Like I said, queues and pipes are very straightforward implementations, you could do it in half an evening and call it a day. They are also very flexible, you can put almost anything picklable in there, making it easier on future you if the requirements for the code changes. Shared memory, on the other hand, comes with it's usual set of headaches and bottlenecks (thread-safety being both of them). If you don't want to go as low level as working with memory blocks, there is also multiprocessing.Array that works similar to mp.Value. But again it's restrictive in what it can store (I have never used this, so there may be some workarounds that I am not aware of, take it with a grain of salt).
  • Related