Home > Enterprise >  Compare a column against a dictionary in Dask
Compare a column against a dictionary in Dask

Time:07-26

I have a dictionary:

dict = {10: 1, 50: 2, 200: 3, 500: 4}

And a Dask DataFrame:

 --- --- 
|  a|  b|
 --- --- 
|  1| 24|
|  1| 49|
|  2|125|
|  3|400|
 --- --- 

I want to groupBy a and get the minimum b value. After that, I want to check which dict key is closest to b and create a new column with the dict value.

As a example, when b=24, the closest key is 10. So I want to assign the value 1. This is the result I am expecting:

 --- --- ------- 
|  a|  b|closest|
 --- --- ------- 
|  1| 24|      1|
|  1| 49|      2|
|  2|125|      3|
|  3|400|      4|
 --- --- ------- 

I have found something similar with PySpark. I have not been able to make it run, but it apparently run for other people. Sharing it anyway for reference.

df = spark.createDataFrame(
    [
        (1, 24),
        (1, 49),
        (2, 125),
        (3, 400)
    ],
    ["a", "b"]
)

dict = {10:1, 50:2, 200: 3, 500: 4}

def func(value, dict):
    closest_key = (
        value if value in dict else builtins.min(
            dict.keys(), key=lambda k: builtins.abs(k - value)
        )
    )
    score = dict.get(closest_key)
    return score

df = (
    df.groupby('a')
        .agg(
            min('b')
        )
    ).withColumn('closest', func('b', dict))


From what I understand, I think on the spark version the calculation was done per row and I have not been able to replicate that.

CodePudding user response:

So here it is another approach for you friend, this will return a numpy array, but hey it will be faster than spark, and you can easily reindex it.

import numpy as np
a = pydf.toNumpy()
a = a[:,1] # Grabs your b column
np.where([a <=10,a <=50,a<=200,a<=500],[1,2,3,4],a) # Check the closest values and fill them with what you want

CodePudding user response:

Instead of thinking of a row-rise operation, you can think of it as a partition-wise operation. If my interpretation is off, you can still use this sample I wrote for the most part with a few tweaks.

I will show a solution with Fugue that lets you just define your logic in Pandas, and then bring it to Dask. This will return a Dask DataFrame.

First some setup, note that df is a Pandas DataFrame. This is meant to represent a smaller sample you can test on:

import pandas as pd
import dask.dataframe as dd
import numpy as np

_dict = {10: 1, 50: 2, 200: 3, 500: 4}
df = pd.DataFrame({"a": [1,1,2,3], "b":[24,49,125,400]})
ddf = dd.from_pandas(df, npartitions=2)

and then we define the logic. This is written to handle one partition so everything in column a will already be the same value.

def logic(df: pd.DataFrame) -> pd.DataFrame:
    # handles the logic for 1 group. all values in a are the same
    min_b = df['b'].min()
    keys = np.array(list(_dict.keys()))
    # closest taken from https://stackoverflow.com/a/10465997/11163214
    closest = keys[np.abs(keys - min_b).argmin()]
    closest_val = _dict[closest]
    df = df.assign(closest=closest_val)
    return df

We can test this on Pandas:

logic(df.loc[df['a'] == 1])

and we'll get:

    a   b   closest
0   1   24  1
1   1   49  1

So then we can just bring it to Dask with Fugue. We just need to call the transform function:

from fugue import transform

ddf = transform(ddf,
          logic,
          schema="*,closest:int",
          partition={"by":"a"},
          engine="dask")
ddf.compute()

This can take in either Pandas or Dask DataFrames and will output the Dask DataFrame because we specified the "dask" engine. There is also a "spark" engine if you want a Spark DataFrame.

Schema is a requirement for distributed computing so we specify the output schema here. We also partition by column a.

  • Related