I have a really large pandas dataframe and I am trying split it into multiple ones by stock names and save them to csv.
stock date time spread time_diff
VOD 01-01 9:05 0.01 0:07
VOD 01-01 9:12 0.03 0:52
VOD 01-01 10:04 0.02 0:11
VOD 01-01 10:15 0.01 0:10
BAT 01-01 10:25 0.03 0:39
BAT 01-01 11:04 0.02 22:00
BAT 01-02 9:04 0.02 0:05
BAT 01-01 10:15 0.01 0:10
BOA 01-01 10:25 0.03 0:39
BOA 01-01 11:04 0.02 22:00
BOA 01-02 9:04 0.02 0:05
I know how to do this in. conventional way
def split_save(df):
ids = df['stock'].unique()
for id in ids:
df = df[df['stock']==id]
df.to_csv(f'{my_path}/id.csv')
However, since I got a really large dataframe and thousands of stocks, I want to multiprocessing for acceleration.
Any thought ? (I might also try pyspark later.)
Thank you !
CodePudding user response:
Being I/O involved I don't expect the selection of the dataframe to be the main blocking point.
So far, I can provide you two solutions to speed it up:
Threading: Just launch each stock in a different thread or in a ThreadPoolExecutor
def dump_csv(df, ticker):
df.groupby(ticker).to_csv(f'{my_path}/{ticker}.csv')
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(df, ticker):ticker for ticker in df['stock'].unique()}
for future in concurrent.futures.as_completed(futures):
print(f"Dumped ticker {futures[future]}")
(code not tested, adapted from the example)
Working in a ZIP file: For storing many many files, zip archives is a very good option, but it should be supported by the "reader".
For the sake of completeness:
with ZipFile('stocks.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf:
ids = df['stock'].unique()
for id in ids:
zf.writestr(f'{id}.csv', df.groupby(ticker).to_csv())
CodePudding user response:
I doubt that groupby
is what's holding you back, but for writing we could speed things up by multithreading
like this:
from concurrent.futures import ThreadPoolExecutor
# Number of cores/threads your CPU has/that you want to use.
workers = 4
def save_group(grouped):
name, group = grouped
group.to_csv(f'{name}.csv')
with ThreadPoolExecutor(workers) as pool:
processed = pool.map(save_group, df.groupby('stock'))