Home > Software engineering >  Thread-safe way to use preloaded data in a for-loop
Thread-safe way to use preloaded data in a for-loop

Time:01-04

Suppose we apply a set of (inplace) operations within a for-loop on mostly the same fudamental data (mutable). What is a memory efficient (and thread safe) way to do so?

Note the fundamental data should not be altered within the for-loop from iteration to iteration.

Example Code:

Assume we have some Excel files containing fundamental data in a data directory. Further we have some addtional data in the some_more_data directory. I want to apply operations on the data retrieved from the data directory using the files from the some_more_data directory. Afterwards I want to print the results to a new pickle file.

import copy
import pickle
import pandas as pd

# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):

   data_dict = dict()
   for dname, dpath in info_dict.items():
       data_dict[dname] = pd.read_excel(dpath, index_col=0)

   return data_dict

# list of data files
data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
raw_data = read_data(data_list)

# list of files used for operation (they, for example, have different indices)
some_more_data= {
    'some_data_a': 'some_more_data/some_data_a.xlsx',
    'some_data_b': 'some_more_data/some_data_b.xlsx'
    }
some_more_data = read_data(some_more_data)

# Apply operation to data (explicitly use a for-loop)
for smd_k, smd_v in some_more_data.items():
   rdata = copy.deepcopy(raw_data)

   rdata['price'] = rdata['price'].reindex(smd_v.index)
   rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)

   with open(f'data/changed_{smd_k}.pkl', 'wb') as handle:
        pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)

Is the deepcopy operation in my example above thread safe (assuming I want to use Multithreading)? Or should I repeatedly load the data from Excel within the for loop (very slow)? Or is there an even better way?

Thank you for your help.


Code to Generate Sample DataFrames and Save Data in Excel Files

Note that the directories data and some_more_data must be created manually first

import pandas as pd
import numpy as np


price = pd.DataFrame([[-1.332298,  0.396217,  0.574269, -0.679972, -0.470584,  0.234379],
                      [-0.222567,  0.281202, -0.505856, -1.392477,  0.941539,  0.974867],
                      [-1.139867, -0.458111, -0.999498,  1.920840,  0.478174, -0.315904],
                      [-0.189720, -0.542432, -0.471642,  1.506206, -1.506439,  0.301714]],
                     columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], 
                     index=pd.date_range('2000', freq='D', periods=4))

eps = pd.DataFrame([[-1.91,  1.63,  0.51, -.32, -0.84,  0.37],
                      [-0.56,  0.02, 0.56, 1.77,  0.99,  0.97],
                      [-1.67, -0.41, -0.98,  1.20,  0.74, -0.04],
                      [-0.80, -0.43, -0.12,  1.06, 1.59,  0.34]],
                     columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], 
                     index=pd.date_range('2000', freq='D', periods=4))

some_data_a = pd.DataFrame(np.random.randint(0,100,size=(4, 6)), columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], index=pd.date_range('2001', freq='D', periods=4))
some_data_b = pd.DataFrame(np.random.randint(0,100,size=(20, 6)), columns=['GM', 'TSLA', 'IBM', 'MSFT', 'APPL', 'ORCL'], index=pd.date_range('2000', freq='D', periods=20))

price.to_excel('data/price.xlsx')
eps.to_excel('data/eps.xlsx')
some_data_a.to_excel('some_more_data/some_data_a.xlsx')
some_data_b.to_excel('some_more_data/some_data_b.xlsx')

CodePudding user response:

Once your raw_data dictionary has been created, I don't see where it is ever modified (after all, that is the point of using deepcopy on it). So while deep-copying a mutable object is not thread safe, this particular object is not undergoing mutation at any time. So I don't see why there would be an issue. But, you could always do the deepcopy under control of a lock if you were not confident.

If you are doing this with multithreading, then using a threading.Lock is probably not going to cost you in performance since the deepcopy operation is all CPU and you cannot achieve any deepcopy parallelism anyway because your thread is already locking on the Global Interpreter Lock (GIL) for that function (it is primarily Python bytecode). This additional locking just prevents giving up your time slice while in the middle of a deepcopy operation to another thread that might begin a deepcopy operation (but again, I still don't think that is an issue). But if you are using multithreading, then what performance increase will you be getting from doing concurrent I/O operations? Depending on whether you have a hard disk drive or solid state drive and what the characteristics of that drive is, concurrency might even hurt your I/O performance. You may get some performance improvement from the Pandas operations if they release the GIL.

Multiprocessing, which does provide true parallelism of CPU-intensive functions, has its own overhead in the creation of the processes and in passing data from one address space to another (i.e. one process to another). This additional overhead that you do not have in serial processing has to be compensated for by the savings achieved by parallelizing your calculations. It's not clear from what you have shown, if that is indeed representative of your actual situation, that you would gain anything from that parallelism. But, then, of course, you would not have to worry about the thread safety of deepcopy since once each process has a copy of raw_data that process would be running a single thread with its own copy of memory totally isolated from one another.

Summary

  1. In general, deepcopy is not thread safe for mutable objects but since your object does not appear to be "mutating", it shouldn't be an issue. But if running under multithreading, you could do the deepcopy operation as an atomic operation under control of a multithreading.Lock without any significant loss in performance.

  2. If you are using multiprocessing, and assuming raw_data was not being implemented in shared memory, then each process would be working on its own copy of raw_data to begin with. So even if another process were "mutating" raw_data, as long as any one process was running a single thread, there is no need to worry about the thread safety of deepcopy.

  3. It's not clear whether multithreading or multiprocessing will achieve any performance improvements based on the code I have seen.

Benchmark

This benchmarks serial, multithreading and multiprocessing. Perhaps with only 2 keys in each dictionary this is not a realistic example but it gives a general idea:

import copy
import pickle
import pandas as pd
import time
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count


# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):

   data_dict = dict()
   for dname, dpath in info_dict.items():
       data_dict[dname] = pd.read_excel(dpath, index_col=0)

   return data_dict

def serial(raw_data, some_more_data, suffix):
    # Apply operation to data (explicitly use a for-loop)
    for smd_k, smd_v in some_more_data.items():
       rdata = copy.deepcopy(raw_data)

       rdata['price'] = rdata['price'].reindex(smd_v.index)
       rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)

       with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
            pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)

def init_pool(r_d, sfx):
    global raw_data, suffix
    raw_data = r_d
    suffix = sfx

def worker(smd_k, smd_v):
    rdata = copy.deepcopy(raw_data)

    rdata['price'] = rdata['price'].reindex(smd_v.index)
    rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)

    with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
         pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)

def benchmark1(raw_data, some_more_data):
    start_time = time.time()
    serial(raw_data, some_more_data, '1')
    elapsed = time.time() - start_time
    print('Serial time:', elapsed)

def benchmark2(raw_data, some_more_data):
    start_time = time.time()
    items = list(some_more_data.items())
    pool_size = len(items)
    pool = ThreadPool(pool_size, initializer=init_pool, initargs=(raw_data, '2'))
    pool.starmap(worker, items)
    elapsed = time.time() - start_time
    print('Multithreading time:', elapsed)
    pool.close()
    pool.join()

def benchmark3(raw_data, some_more_data):
    start_time = time.time()
    items = list(some_more_data.items())
    pool_size = min(len(items), cpu_count())
    pool = Pool(pool_size, initializer=init_pool, initargs=(raw_data, '3'))
    pool.starmap(worker, items)
    elapsed = time.time() - start_time
    print('Multiprocessing time:', elapsed)
    pool.close()
    pool.join()

def main():
# list of data files
    data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
    raw_data = read_data(data_list)

    # list of files used for operation (they, for example, have different indices)
    some_more_data= {
        'some_data_a': 'some_more_data/some_data_a.xlsx',
        'some_data_b': 'some_more_data/some_data_b.xlsx'
        }
    some_more_data = read_data(some_more_data)

    benchmark1(raw_data, some_more_data)
    benchmark2(raw_data, some_more_data)
    benchmark3(raw_data, some_more_data)

if __name__ == '__main__':
    main()

Prints:

Serial time: 0.002997159957885742
Multithreading time: 0.013999462127685547
Multiprocessing time: 0.7790002822875977
  •  Tags:  
  • Related