I have this code that calls a dask @delayed
function that takes N dask dataframes as input and returns a dask dataframe as output.
There are two problems (1) inside the function the type of the dataframe is pandas instead of dask, and (2) when I get the result of the function, it's also pandas instead of dask.
What is the logic behind @delayed
to get as input pandas dataframes instead of dask? I need to work only with dask dataframes.
This is the code:
df = pd.DataFrame({
'height': [6.21, 5.12, 5.85, 5.78, 5.98],
'weight': [150, 126, 133, 164, 203]
})
df_dask = dd.from_pandas(df, npartitions=2)
@delayed
def some_function(*b):
print('type b[0]: ' str(type(b[0])) )
ddf = b[0]
return ddf
ddfout = some_function(df_dask, df_dask, df_dask)
computed = ddfout.compute()
>>> type b[0]: <class 'pandas.core.frame.DataFrame'> # this should be dask dataframe
type(computed)
>>> pandas.core.frame.DataFrame
CodePudding user response:
The way to get column 0
of dask dataframe ddf
is:
ddf[0]
There is no need for delayed here, the API already provides lazy operations over all the pandas methods that are supported (most of them).
Delayed is for arbitrary operations on constants and other delayed values, not dask collections like the dataframe.
See documentation: https://docs.dask.org/en/stable/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections ; you probably wanted ddf.map_partitions
.