Suppose we apply a set of (inplace) operations within a for-loop on mostly the same fudamental data (mutable). What is a memory efficient (and thread safe) way to do so?
Note the fundamental data should not be altered within the for-loop from iteration to iteration.
Example Code:
Assume we have some Excel files containing fundamental data in a data
directory. Further we have some addtional data in the some_more_data
directory. I want to apply operations on the data retrieved from the data
directory using the files from the some_more_data
directory. Afterwards I want to print the results to a new pickle file.
import copy
import pickle
import pandas as pd
# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):
data_dict = dict()
for dname, dpath in info_dict.items():
data_dict[dname] = pd.read_excel(dpath, index_col=0)
return data_dict
# list of data files
data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
raw_data = read_data(data_list)
# list of files used for operation (they, for example, have different indices)
some_more_data= {
'some_data_a': 'some_more_data/some_data_a.xlsx',
'some_data_b': 'some_more_data/some_data_b.xlsx'
}
some_more_data = read_data(some_more_data)
# Apply operation to data (explicitly use a for-loop)
for smd_k, smd_v in some_more_data.items():
rdata = copy.deepcopy(raw_data)
rdata['price'] = rdata['price'].reindex(smd_v.index)
rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)
with open(f'data/changed_{smd_k}.pkl', 'wb') as handle:
pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)
Is the deepcopy operation in my example above thread safe (assuming I want to use Multithreading)? Or should I repeatedly load the data from Excel within the for loop (very slow)? Or is there an even better way?
Thank you for your help.
Code to Generate Sample DataFrames and Save Data in Excel Files
Note that the directories data
and some_more_data
must be created manually first
import pandas as pd
import numpy as np
price = pd.DataFrame([[-1.332298, 0.396217, 0.574269, -0.679972, -0.470584, 0.234379],
[-0.222567, 0.281202, -0.505856, -1.392477, 0.941539, 0.974867],
[-1.139867, -0.458111, -0.999498, 1.920840, 0.478174, -0.315904],
[-0.189720, -0.542432, -0.471642, 1.506206, -1.506439, 0.301714]],
columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'],
index=pd.date_range('2000', freq='D', periods=4))
eps = pd.DataFrame([[-1.91, 1.63, 0.51, -.32, -0.84, 0.37],
[-0.56, 0.02, 0.56, 1.77, 0.99, 0.97],
[-1.67, -0.41, -0.98, 1.20, 0.74, -0.04],
[-0.80, -0.43, -0.12, 1.06, 1.59, 0.34]],
columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'],
index=pd.date_range('2000', freq='D', periods=4))
some_data_a = pd.DataFrame(np.random.randint(0,100,size=(4, 6)), columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], index=pd.date_range('2001', freq='D', periods=4))
some_data_b = pd.DataFrame(np.random.randint(0,100,size=(20, 6)), columns=['GM', 'TSLA', 'IBM', 'MSFT', 'APPL', 'ORCL'], index=pd.date_range('2000', freq='D', periods=20))
price.to_excel('data/price.xlsx')
eps.to_excel('data/eps.xlsx')
some_data_a.to_excel('some_more_data/some_data_a.xlsx')
some_data_b.to_excel('some_more_data/some_data_b.xlsx')
CodePudding user response:
Once your raw_data
dictionary has been created, I don't see where it is ever modified (after all, that is the point of using deepcopy
on it). So while deep-copying a mutable object is not thread safe, this particular object is not undergoing mutation at any time. So I don't see why there would be an issue. But, you could always do the deepcopy
under control of a lock if you were not confident.
If you are doing this with multithreading, then using a threading.Lock
is probably not going to cost you in performance since the deepcopy operation is all CPU and you cannot achieve any deepcopy
parallelism anyway because your thread is already locking on the Global Interpreter Lock (GIL) for that function (it is primarily Python bytecode). This additional locking just prevents giving up your time slice while in the middle of a deepcopy
operation to another thread that might begin a deepcopy
operation (but again, I still don't think that is an issue). But if you are using multithreading, then what performance increase will you be getting from doing concurrent I/O operations? Depending on whether you have a hard disk drive or solid state drive and what the characteristics of that drive is, concurrency might even hurt your I/O performance. You may get some performance improvement from the Pandas
operations if they release the GIL.
Multiprocessing, which does provide true parallelism of CPU-intensive functions, has its own overhead in the creation of the processes and in passing data from one address space to another (i.e. one process to another). This additional overhead that you do not have in serial processing has to be compensated for by the savings achieved by parallelizing your calculations. It's not clear from what you have shown, if that is indeed representative of your actual situation, that you would gain anything from that parallelism. But, then, of course, you would not have to worry about the thread safety of deepcopy
since once each process has a copy of raw_data
that process would be running a single thread with its own copy of memory totally isolated from one another.
Summary
In general,
deepcopy
is not thread safe for mutable objects but since your object does not appear to be "mutating", it shouldn't be an issue. But if running under multithreading, you could do thedeepcopy
operation as an atomic operation under control of amultithreading.Lock
without any significant loss in performance.If you are using multiprocessing, and assuming
raw_data
was not being implemented in shared memory, then each process would be working on its own copy ofraw_data
to begin with. So even if another process were "mutating"raw_data
, as long as any one process was running a single thread, there is no need to worry about the thread safety ofdeepcopy
.It's not clear whether multithreading or multiprocessing will achieve any performance improvements based on the code I have seen.
Benchmark
This benchmarks serial, multithreading and multiprocessing. Perhaps with only 2 keys in each dictionary this is not a realistic example but it gives a general idea:
import copy
import pickle
import pandas as pd
import time
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count
# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):
data_dict = dict()
for dname, dpath in info_dict.items():
data_dict[dname] = pd.read_excel(dpath, index_col=0)
return data_dict
def serial(raw_data, some_more_data, suffix):
# Apply operation to data (explicitly use a for-loop)
for smd_k, smd_v in some_more_data.items():
rdata = copy.deepcopy(raw_data)
rdata['price'] = rdata['price'].reindex(smd_v.index)
rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)
with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)
def init_pool(r_d, sfx):
global raw_data, suffix
raw_data = r_d
suffix = sfx
def worker(smd_k, smd_v):
rdata = copy.deepcopy(raw_data)
rdata['price'] = rdata['price'].reindex(smd_v.index)
rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)
with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)
def benchmark1(raw_data, some_more_data):
start_time = time.time()
serial(raw_data, some_more_data, '1')
elapsed = time.time() - start_time
print('Serial time:', elapsed)
def benchmark2(raw_data, some_more_data):
start_time = time.time()
items = list(some_more_data.items())
pool_size = len(items)
pool = ThreadPool(pool_size, initializer=init_pool, initargs=(raw_data, '2'))
pool.starmap(worker, items)
elapsed = time.time() - start_time
print('Multithreading time:', elapsed)
pool.close()
pool.join()
def benchmark3(raw_data, some_more_data):
start_time = time.time()
items = list(some_more_data.items())
pool_size = min(len(items), cpu_count())
pool = Pool(pool_size, initializer=init_pool, initargs=(raw_data, '3'))
pool.starmap(worker, items)
elapsed = time.time() - start_time
print('Multiprocessing time:', elapsed)
pool.close()
pool.join()
def main():
# list of data files
data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
raw_data = read_data(data_list)
# list of files used for operation (they, for example, have different indices)
some_more_data= {
'some_data_a': 'some_more_data/some_data_a.xlsx',
'some_data_b': 'some_more_data/some_data_b.xlsx'
}
some_more_data = read_data(some_more_data)
benchmark1(raw_data, some_more_data)
benchmark2(raw_data, some_more_data)
benchmark3(raw_data, some_more_data)
if __name__ == '__main__':
main()
Prints:
Serial time: 0.002997159957885742
Multithreading time: 0.013999462127685547
Multiprocessing time: 0.7790002822875977