Home > Software design >  Find element of a column which has the most common tokens with another column?
Find element of a column which has the most common tokens with another column?

Time:07-27

How can I convert the following pandas code to PySpark?

import pandas as pd
import numpy as np

df["col2_set"] = df["col2"].apply(lambda x: set(x.split(" ")))
def setter(x):
    data = x.col1.split(",")
    res = np.array([len(x.col2_set.intersection(y.split(" "))) for y in data])
    if res.sum() == 0:
        return None
    else:
        return data[res.argmax()]
df['match'] = df.apply(lambda x: setter(x), axis=1)
df.drop(columns=['col2_set'], inplace=True)

Explaination: The following make a list of col2 with splitting by space

df["col2_set"] = df["col2"].apply(lambda x: set(x.split(" ")))

The setter function is applied to each row of a dataset. it makes each row of col1 a list with separating by ',' . Then it select an element of col2 where is has the most common token with tokens of col1.

Example:

key     col1                                                         col2
ab    'summer hot, best friend, not possible, apple, let it go'      "let be hot"
cd     'do it better', 'I am sa'                                     "I need to go"
fg      'my best post, fun sunday'                                   "it's great"

Output:

key     col1                                                         col2            match
ab    'summer hot, best friend, not possible, apple, let it go'    "let be hot"    "let it go"
cd     'do it better, I am sa'                                  "I need it do sa"    "I am sa"
fg      'my best post, fun sunday'                                    "it's great"       None

The first row has a common token with both summer hot and let it go, but it doesn't matter which one is selected.

CodePudding user response:

You can get the desired result using higher-order function transform

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('ab', 'summer hot, best friend, not possible, apple, let it go', "let be hot"),
     ('cd', 'do it better, I am sa', "I need to go"),
     ('fg', 'my best post, fun sunday', "it's great")],
    ['key', 'col1', 'col2'])

c1_arr = F.split('col1', ', *')
c2_arr = F.split('col2', ' ')
arr_of_struct = F.transform(
    c1_arr,
    lambda x: F.struct(
        F.size(F.array_intersect(c2_arr, F.split(x, ' '))).alias('cnt'),
        x.alias('val'),
    )
)
top_val = F.sort_array(arr_of_struct, False)[0]
df = df.withColumn('match', F.when(top_val['cnt'] > 0, top_val['val']))

df.show(truncate=0)
#  --- ------------------------------------------------------- ------------ ---------- 
# |key|col1                                                   |col2        |match     |
#  --- ------------------------------------------------------- ------------ ---------- 
# |ab |summer hot, best friend, not possible, apple, let it go|let be hot  |summer hot|
# |cd |do it better, I am sa                                  |I need to go|I am sa   |
# |fg |my best post, fun sunday                               |it's great  |null      |
#  --- ------------------------------------------------------- ------------ ---------- 

It covers what was asked in the question, and it should be fine as per your comments, that you don't care which of max-match values from col1 will go into the result. But for what it's worth, I should show that the script is not identic, there are some strange edge cases which would work differently:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('ab', 'summer hot, best friend, not possible, apple, let it go', "let be hot"),
     ('cd', 'do it better, I am sa', "I need to go"),
     ('ed', 'q w,e r,t y', "q e r"),
     ('zz', 'q w,e r, p p o, t y', "q e r p o"),
     ('yy', 'q w,p p o, e r, t y', "q e r p o"),
     ('cc', 'q w,e r p,   e r y', "e e r"),
     ('vv', 'q w,e r y,   e r p', "e e r"),
     ('fg', 'my best post, fun sunday', "it's great")],
    ['key', 'col1', 'col2'])


df = df.toPandas()

import pandas as pd
import numpy as np
df["col2_set"] = df["col2"].apply(lambda x: set(x.split(" ")))
def setter(x):
    data = x.col1.split(",")
    res = np.array([len(x.col2_set.intersection(y.split(" "))) for y in data])
    if res.sum() == 0:
        return None
    else:
        return data[res.argmax()]
df['match_pandas'] = df.apply(lambda x: setter(x), axis=1)
df.drop(columns=['col2_set'], inplace=True)

df = spark.createDataFrame(df)

c1_arr = F.split('col1', ', *')
c2_arr = F.split('col2', ' ')
arr_of_struct = F.transform(
    c1_arr,
    lambda x: F.struct(
        F.size(F.array_intersect(c2_arr, F.split(x, ' '))).alias('cnt'),
        x.alias('val'),
    )
)
top_val = F.sort_array(arr_of_struct, False)[0]
df = df.withColumn('match_spark', F.when(top_val['cnt'] > 0, top_val['val']))

df.show(truncate=0)
#  --- ------------------------------------------------------- ------------ ------------ ----------- 
# |key|col1                                                   |col2        |match_pandas|match_spark|
#  --- ------------------------------------------------------- ------------ ------------ ----------- 
# |ab |summer hot, best friend, not possible, apple, let it go|let be hot  |summer hot  |summer hot |
# |cd |do it better, I am sa                                  |I need to go| I am sa    |I am sa    |
# |ed |q w,e r,t y                                            |q e r       |e r         |e r        |
# |zz |q w,e r, p p o, t y                                    |q e r p o   |e r         |p p o      |
# |yy |q w,p p o, e r, t y                                    |q e r p o   |p p o       |p p o      |
# |cc |q w,e r p,   e r y                                     |e e r       |e r p       |e r y      |
# |vv |q w,e r y,   e r p                                     |e e r       |e r y       |e r y      |
# |fg |my best post, fun sunday                               |it's great  |null        |null       |
#  --- ------------------------------------------------------- ------------ ------------ ----------- 
  • Related