I need to aggregate data from a high frequency time series into a lower frequency time series one. The case is that I have, for example, a price time series with a frequency of one minute. For the same time range, I have price time series with a frequency of 1 hour. For each hourly period I need to calculate some stats from the one minute one.
If both time series fit in memory, I can load them and solve my requirement easily with pandas apply function. Unfortunately, this is not the reality for me (and something tells me that I’m not the only one). So, to solve this I’m trying to use Dask, but when I try to mimic what I’m doing with pandas, I’m getting the following error:
Series getitem is only supported for other series objects with matching partition structure<
Follows the full code and error, but before that my questions. Am I doing something wrong?, Is it possible to achieve this with Dask?, Is there any other alternative?. Probably I will have to split datasets into smaller ones so I can make them fit in memory and process with Pandas.
Example
First I generate data with a random walk.
import numpy as np
import pandas as pd
import dask.dataframe as dd
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
Second, I try with pandas
def my_func(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min['bid'][(df_min.index >=start_date)&(df_min.index < end_date)].mean()
result_pd = df_h.apply(lambda row: my_func(row), axis=1)
result_pd
date | |
---|---|
2020-01-24 13:00:00 | 1.128743 |
2020-01-24 14:00:00 | 1.127739 |
2020-01-24 15:00:00 | 1.130548 |
... | |
2021-12-18 23:00:00 | 0.482139 |
Finally, I give a try with Dask
dd_min = dd.from_pandas(df_min, npartitions=10)
dd_h = dd.from_pandas(df_h, npartitions=6)
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return dd_min['bid'][(dd_min.index >=start_date)&(dd_min.index < end_date)].mean()
res = dd_h.map_partitions(lambda df: df. apply(lambda row: my_func_2(row), axis=1,),meta=('x', 'float64'))
result_dd = res.compute()
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<timed exec> in <module>
c:\mch_py_38\lib\site-packages\dask\base.py in compute(self, **kwargs)
284 dask.base.compute
285 """
--> 286 (result,) = compute(self, traverse=False, **kwargs)
287 return result
288
c:\mch_py_38\lib\site-packages\dask\base.py in compute(*args, **kwargs)
566 postcomputes.append(x.__dask_postcompute__())
567
--> 568 results = schedule(dsk, keys, **kwargs)
569 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
570
c:\mch_py_38\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
77 pool = MultiprocessingPoolExecutor(pool)
78
---> 79 results = get_async(
80 pool.submit,
81 pool._max_workers,
c:\mch_py_38\lib\site-packages\dask\local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
515 _execute_task(task, data) # Re-execute locally
516 else:
--> 517 raise_exception(exc, tb)
518 res, worker_id = loads(res_info)
519 state["cache"][key] = res
c:\mch_py_38\lib\site-packages\dask\local.py in reraise(exc, tb)
323 if exc.__traceback__ is not tb:
324 raise exc.with_traceback(tb)
--> 325 raise exc
326
327
c:\mch_py_38\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
221 try:
222 task, data = loads(task_info)
--> 223 result = _execute_task(task, data)
224 id = get_id()
225 result = dumps((result, id))
c:\mch_py_38\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
c:\mch_py_38\lib\site-packages\dask\optimization.py in __call__(self, *args)
967 if not len(args) == len(self.inkeys):
968 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
970
971 def __reduce__(self):
c:\mch_py_38\lib\site-packages\dask\core.py in get(dsk, out, cache)
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
c:\mch_py_38\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
c:\mch_py_38\lib\site-packages\dask\utils.py in apply(func, args, kwargs)
33 def apply(func, args, kwargs=None):
34 if kwargs:
---> 35 return func(*args, **kwargs)
36 else:
37 return func(*args)
c:\mch_py_38\lib\site-packages\dask\dataframe\core.py in apply_and_enforce(*args, **kwargs)
5830 func = kwargs.pop("_func")
5831 meta = kwargs.pop("_meta")
-> 5832 df = func(*args, **kwargs)
5833 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
5834 if not len(df):
<timed exec> in <lambda>(df)
c:\mch_py_38\lib\site-packages\pandas\core\frame.py in apply(self, func, axis, raw, result_type, args, **kwargs)
8734 kwargs=kwargs,
8735 )
-> 8736 return op.apply()
8737
8738 def applymap(
c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply(self)
686 return self.apply_raw()
687
--> 688 return self.apply_standard()
689
690 def agg(self):
c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply_standard(self)
810
811 def apply_standard(self):
--> 812 results, res_index = self.apply_series_generator()
813
814 # wrap results
c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply_series_generator(self)
826 for i, v in enumerate(series_gen):
827 # ignore SettingWithCopy here in case the user mutates
--> 828 results[i] = self.f(v)
829 if isinstance(results[i], ABCSeries):
830 # If we have a view on v, we need to make a copy because
<timed exec> in <lambda>(row)
<ipython-input-30-8ca073921a34> in my_func_2(row)
5 start_date = pd.to_datetime(row['open_date'])
6 end_date = pd.to_datetime(row['close_date'])
----> 7 return dd_min['bid'][(dd_min.index >=start_date)&(dd_min.index < end_date)].mean()
c:\mch_py_38\lib\site-packages\dask\dataframe\core.py in __getitem__(self, key)
3260 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self, key])
3261 return Series(graph, name, self._meta, self.divisions)
-> 3262 raise NotImplementedError(
3263 "Series getitem is only supported for other series objects "
3264 "with matching partition structure"
NotImplementedError: Series getitem is only supported for other series objects with matching partition structure
CodePudding user response:
Following @mdurant advice and fixing @SultanOrazbayev observation worked! But I don't know what I'm doing wrong that the solution is a way slow than pandas itself.
import pandas as pd
import dask.dataframe as dd
import numpy as np
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
dd_min = dd.from_pandas(df_min, npartitions=10)
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return dd_min.loc[((dd_min.index >=start_date)&(dd_min.index < end_date)), 'bid'].mean().compute()
%%time
df_h.dropna(inplace=True)
res = df_h.apply(lambda row: my_func_2(row), axis=1)
Wall time: 8min 46s
Pandas only
%%time
def my_func(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()
result_pd = df_h.apply(lambda row: my_func(row), axis=1)
Wall time: 2min 11s
This solution is out of my solution space.
Pandas pyarrow solution.
I have discarded Dask for now and decided to try a solution combining pandas and pyarrow, reading from the hourly frequency dataset in chunks of n rows. From each chunk I take the date range limits and use them as filters to read from the minute-wise parquet format data set using pyarrow:
import pandas as pd
import numpy as np
import pyarrow
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
# SAVE FILES
df_min.to_parquet('minute_dataset.parquet')
df_h.to_csv('hourly_dataset.csv')
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()
%%time
# READ-CALCULATE LOOP
full_df = list()
for df in pd.read_csv('hourly_dataset.csv', chunksize=10):
date_from = pd.to_datetime(df.iloc[0]['open_date'])
date_to = pd.to_datetime(df.iloc[-1]['close_date'])
filter = [[('date', '>=', date_from), ('date', '<', date_to)]]
df_min = pyarrow.parquet.read_table('minute_dataset.parquet', filters=filter).to_pandas()
min_date_from = df_min.iloc[0].name
min_date_to = df_min.iloc[-1].name
df['mean'] = df.apply(lambda row: my_func_2(row), axis=1)
full_df.append(df)
df_result = pd.concat(full_df)
Wall time: 1min 31s
This time the performance is within the solution space.
df_result
CodePudding user response:
This is a quick hack on the answer provided by OP, note there is a lot of scope for improvement, but this is just a demo of some of the ways to use dask
. The snippet below ran on my machine in 30 sec, while the original snippet ran for about 52 sec. The improvement is not stellar, but there's a lot of room for optimization...
import pandas as pd
import numpy as np
import pyarrow
import dask
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
# SAVE FILES
df_min.to_parquet('minute_dataset.parquet')
df_h.to_csv('hourly_dataset.csv')
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()
@dask.delayed
def delayed_computations(df):
date_from = pd.to_datetime(df.iloc[0]['open_date'])
date_to = pd.to_datetime(df.iloc[-1]['close_date'])
filter = [[('date', '>=', date_from), ('date', '<', date_to)]]
df_min = pyarrow.parquet.read_table('minute_dataset.parquet', filters=filter).to_pandas()
min_date_from = df_min.iloc[0].name
min_date_to = df_min.iloc[-1].name
df['mean'] = df.apply(lambda row: my_func_2(row), axis=1)
return df
# READ-CALCULATE LOOP
full_df = list()
for df in pd.read_csv('hourly_dataset.csv', chunksize=1000):
full_df.append(delayed_computations(df))
full_df = dask.compute(*full_df)
df_result = pd.concat(full_df)