Home > Software design >  Dask: Series getitem is only supported for other series objects with matching partition structure -
Dask: Series getitem is only supported for other series objects with matching partition structure -

Time:12-22

I need to aggregate data from a high frequency time series into a lower frequency time series one. The case is that I have, for example, a price time series with a frequency of one minute. For the same time range, I have price time series with a frequency of 1 hour. For each hourly period I need to calculate some stats from the one minute one.

If both time series fit in memory, I can load them and solve my requirement easily with pandas apply function. Unfortunately, this is not the reality for me (and something tells me that I’m not the only one). So, to solve this I’m trying to use Dask, but when I try to mimic what I’m doing with pandas, I’m getting the following error:

Series getitem is only supported for other series objects with matching partition structure<

Follows the full code and error, but before that my questions. Am I doing something wrong?, Is it possible to achieve this with Dask?, Is there any other alternative?. Probably I will have to split datasets into smaller ones so I can make them fit in memory and process with Pandas.

Example

First I generate data with a random walk.

import numpy as np
import pandas as pd
import dask.dataframe as dd

# parameters
size = 1000000
price_0 = 1.1300


# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size)  # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price

# random walk 
for i in range(1, len(price)):
    price[i] = price[i-1]   wn[i-1]
    
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)

# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)

Second, I try with pandas

def my_func(row):
    start_date = pd.to_datetime(row['open_date'])
    end_date = pd.to_datetime(row['close_date'])
    return df_min['bid'][(df_min.index >=start_date)&(df_min.index < end_date)].mean()

result_pd = df_h.apply(lambda row: my_func(row), axis=1)
result_pd

date
2020-01-24 13:00:00 1.128743
2020-01-24 14:00:00 1.127739
2020-01-24 15:00:00 1.130548
...
2021-12-18 23:00:00 0.482139

Finally, I give a try with Dask

dd_min = dd.from_pandas(df_min, npartitions=10)
dd_h = dd.from_pandas(df_h, npartitions=6)

def my_func_2(row):
    start_date = pd.to_datetime(row['open_date'])
    end_date = pd.to_datetime(row['close_date'])
    return dd_min['bid'][(dd_min.index >=start_date)&(dd_min.index < end_date)].mean()


res = dd_h.map_partitions(lambda df:    df. apply(lambda row: my_func_2(row), axis=1,),meta=('x', 'float64'))
result_dd = res.compute()
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<timed exec> in <module>

c:\mch_py_38\lib\site-packages\dask\base.py in compute(self, **kwargs)
    284         dask.base.compute
    285         """
--> 286         (result,) = compute(self, traverse=False, **kwargs)
    287         return result
    288 

c:\mch_py_38\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

c:\mch_py_38\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     77             pool = MultiprocessingPoolExecutor(pool)
     78 
---> 79     results = get_async(
     80         pool.submit,
     81         pool._max_workers,

c:\mch_py_38\lib\site-packages\dask\local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    515                             _execute_task(task, data)  # Re-execute locally
    516                         else:
--> 517                             raise_exception(exc, tb)
    518                     res, worker_id = loads(res_info)
    519                     state["cache"][key] = res

c:\mch_py_38\lib\site-packages\dask\local.py in reraise(exc, tb)
    323     if exc.__traceback__ is not tb:
    324         raise exc.with_traceback(tb)
--> 325     raise exc
    326 
    327 

c:\mch_py_38\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

c:\mch_py_38\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

c:\mch_py_38\lib\site-packages\dask\optimization.py in __call__(self, *args)
    967         if not len(args) == len(self.inkeys):
    968             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    970 
    971     def __reduce__(self):

c:\mch_py_38\lib\site-packages\dask\core.py in get(dsk, out, cache)
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

c:\mch_py_38\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

c:\mch_py_38\lib\site-packages\dask\utils.py in apply(func, args, kwargs)
     33 def apply(func, args, kwargs=None):
     34     if kwargs:
---> 35         return func(*args, **kwargs)
     36     else:
     37         return func(*args)

c:\mch_py_38\lib\site-packages\dask\dataframe\core.py in apply_and_enforce(*args, **kwargs)
   5830     func = kwargs.pop("_func")
   5831     meta = kwargs.pop("_meta")
-> 5832     df = func(*args, **kwargs)
   5833     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   5834         if not len(df):

<timed exec> in <lambda>(df)

c:\mch_py_38\lib\site-packages\pandas\core\frame.py in apply(self, func, axis, raw, result_type, args, **kwargs)
   8734             kwargs=kwargs,
   8735         )
-> 8736         return op.apply()
   8737 
   8738     def applymap(

c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply(self)
    686             return self.apply_raw()
    687 
--> 688         return self.apply_standard()
    689 
    690     def agg(self):

c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply_standard(self)
    810 
    811     def apply_standard(self):
--> 812         results, res_index = self.apply_series_generator()
    813 
    814         # wrap results

c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply_series_generator(self)
    826             for i, v in enumerate(series_gen):
    827                 # ignore SettingWithCopy here in case the user mutates
--> 828                 results[i] = self.f(v)
    829                 if isinstance(results[i], ABCSeries):
    830                     # If we have a view on v, we need to make a copy because

<timed exec> in <lambda>(row)

<ipython-input-30-8ca073921a34> in my_func_2(row)
      5     start_date = pd.to_datetime(row['open_date'])
      6     end_date = pd.to_datetime(row['close_date'])
----> 7     return dd_min['bid'][(dd_min.index >=start_date)&(dd_min.index < end_date)].mean()

c:\mch_py_38\lib\site-packages\dask\dataframe\core.py in __getitem__(self, key)
   3260             graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self, key])
   3261             return Series(graph, name, self._meta, self.divisions)
-> 3262         raise NotImplementedError(
   3263             "Series getitem is only supported for other series objects "
   3264             "with matching partition structure"

NotImplementedError: Series getitem is only supported for other series objects with matching partition structure

CodePudding user response:

Following @mdurant advice and fixing @SultanOrazbayev observation worked! But I don't know what I'm doing wrong that the solution is a way slow than pandas itself.

import pandas as pd
import dask.dataframe as dd
import numpy as np

# parameters
size = 1000000
price_0 = 1.1300


# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size)  # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price

# random walk 
for i in range(1, len(price)):
    price[i] = price[i-1]   wn[i-1]
    
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)

# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)

dd_min = dd.from_pandas(df_min, npartitions=10)

def my_func_2(row):
    start_date = pd.to_datetime(row['open_date'])
    end_date = pd.to_datetime(row['close_date'])
    return dd_min.loc[((dd_min.index >=start_date)&(dd_min.index < end_date)), 'bid'].mean().compute()

%%time
df_h.dropna(inplace=True)
res = df_h.apply(lambda row: my_func_2(row), axis=1)

Wall time: 8min 46s

Pandas only

%%time
def my_func(row):
    start_date = pd.to_datetime(row['open_date'])
    end_date = pd.to_datetime(row['close_date'])
    return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()

result_pd = df_h.apply(lambda row: my_func(row), axis=1)

Wall time: 2min 11s

This solution is out of my solution space.

Pandas pyarrow solution.

I have discarded Dask for now and decided to try a solution combining pandas and pyarrow, reading from the hourly frequency dataset in chunks of n rows. From each chunk I take the date range limits and use them as filters to read from the minute-wise parquet format data set using pyarrow:

import pandas as pd
import numpy as np
import pyarrow

# parameters
size = 1000000
price_0 = 1.1300


# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size)  # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price

# random walk 
for i in range(1, len(price)):
    price[i] = price[i-1]   wn[i-1]
    
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)

# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)

# SAVE FILES
df_min.to_parquet('minute_dataset.parquet')
df_h.to_csv('hourly_dataset.csv')

def my_func_2(row):
    start_date = pd.to_datetime(row['open_date'])
    end_date = pd.to_datetime(row['close_date'])
    return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()

%%time
# READ-CALCULATE LOOP
full_df = list()
for df in pd.read_csv('hourly_dataset.csv', chunksize=10):
    date_from = pd.to_datetime(df.iloc[0]['open_date'])
    date_to = pd.to_datetime(df.iloc[-1]['close_date'])
    filter = [[('date', '>=', date_from), ('date', '<', date_to)]]
    df_min = pyarrow.parquet.read_table('minute_dataset.parquet', filters=filter).to_pandas()
    
    min_date_from = df_min.iloc[0].name
    min_date_to = df_min.iloc[-1].name
    
    df['mean'] = df.apply(lambda row: my_func_2(row), axis=1)
    full_df.append(df)

df_result = pd.concat(full_df)

Wall time: 1min 31s

This time the performance is within the solution space.

df_result

enter image description here

CodePudding user response:

This is a quick hack on the answer provided by OP, note there is a lot of scope for improvement, but this is just a demo of some of the ways to use dask. The snippet below ran on my machine in 30 sec, while the original snippet ran for about 52 sec. The improvement is not stellar, but there's a lot of room for optimization...

import pandas as pd
import numpy as np
import pyarrow
import dask

# parameters
size = 1000000
price_0 = 1.1300


# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size)  # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price

# random walk 
for i in range(1, len(price)):
    price[i] = price[i-1]   wn[i-1]
    
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)

# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)

# SAVE FILES
df_min.to_parquet('minute_dataset.parquet')
df_h.to_csv('hourly_dataset.csv')

def my_func_2(row):
    start_date = pd.to_datetime(row['open_date'])
    end_date = pd.to_datetime(row['close_date'])
    return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()

@dask.delayed
def delayed_computations(df):
    date_from = pd.to_datetime(df.iloc[0]['open_date'])
    date_to = pd.to_datetime(df.iloc[-1]['close_date'])
    filter = [[('date', '>=', date_from), ('date', '<', date_to)]]
    df_min = pyarrow.parquet.read_table('minute_dataset.parquet', filters=filter).to_pandas()
    
    min_date_from = df_min.iloc[0].name
    min_date_to = df_min.iloc[-1].name
    
    df['mean'] = df.apply(lambda row: my_func_2(row), axis=1)
    return df
    
# READ-CALCULATE LOOP
full_df = list()

for df in pd.read_csv('hourly_dataset.csv', chunksize=1000):
    full_df.append(delayed_computations(df))

full_df = dask.compute(*full_df)
df_result = pd.concat(full_df)
  • Related