I want to use two Dask DataFrame to process large csv files, and I need to do a groupby(...).apply(...).reset_index() on one DataFrame before joining it with the other:
import pandas as pd
import dask.dataframe as dd
dfA = pd.DataFrame({'x': ["x1", "x2", "x2", "x1", "x3", "x2"],
'y': ["A", "B", "C", "B", "D", "E"]})
ddfA = dd.from_pandas(dfA, npartitions=2)
gA = ddfA.groupby('x').y.apply(list, meta=('y', 'str')).reset_index()
dfB = pd.DataFrame({'x': ["x1", "x2", "x3"],
'z': ["U", "V", "W"]})
ddfB = dd.from_pandas(dfB, npartitions=2)
gA.merge(ddfB, how='left', on='x')
Unfortunately, I have a keyError : 'x'. Can anyone help me to solve this problem?
CodePudding user response:
I don't sure what is your desired output, but if you change the order of the line you can do it:
import pandas as pd
import dask.dataframe as dd
dfA = pd.DataFrame({'x': ["x1", "x2", "x2", "x1", "x3", "x2"],
'y': ["A", "B", "C", "B", "D", "E"]})
dfB = pd.DataFrame({'x': ["x1", "x2", "x3"],
'z': ["U", "V", "W"]})
gA = dfA.merge(dfB, how='left', on='x')
gA = dd.from_pandas(gA, npartitions=2)
gA
x y z
npartitions=2
0 object object object
3 ... ... ...
5 ... ... ...
CodePudding user response:
Looks like agg(list)
helps solve the issue.
dfA = pd.DataFrame(
{"x": ["x1", "x2", "x2", "x1", "x3", "x2"], "y": ["A", "B", "C", "B", "D", "E"]}
)
ddfA = dd.from_pandas(dfA, npartitions=2)
gA = ddfA.groupby("x").y.agg(list).reset_index()
dfB = pd.DataFrame({"x": ["x1", "x2", "x3"], "z": ["U", "V", "W"]})
ddfB = dd.from_pandas(dfB, npartitions=2)
print(gA.merge(ddfB, on="x", how="left").compute())
x y z
0 x1 [A, B] U
1 x2 [B, C, E] V
2 x3 [D] W
If one of the DataFrames is smaller than the other, you may want to look into a broadcast join cause that'll be a lot more performant.