Home > Mobile >  Spark- check intersect of two string columns
Spark- check intersect of two string columns

Time:11-17

I have a dataframe below where colA and colB contain strings. I'm trying to check if colB contains any substring of values in colA. The vaules can contain , or space, but as long as any part of colB's string has overlap with colA's, it is a match. For example row 1 below has an overlap ("bc"), and row 2 does not.

I was thinking of splitting the values into arrays but the delimiters are not constant. Could someone please help to shed some light on how to do this? Many thanks for your help.

    --- ------- ----------- 
   | id|colA   | colB      |
    --- ------- ----------- 
   |  1|abc d  |  bc, z    |
   |  2|abcde  |  hj f     |
    --- ------- ----------- 

CodePudding user response:

You can use a custom UDF to implement the intersect logic as below -

Data Preparation

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

from pyspark.sql.types import StringType

import pandas as pd

data = {"id" :[1,2],
    "colA" : ["abc d","abcde"],
    "colB" : ["bc, z","hj f"]}
mypd = pd.DataFrame(data)

sparkDF = sql.createDataFrame(mypd)

sparkDF.show()

 --- ----- ----- 
| id| colA| colB|
 --- ----- ----- 
|  1|abc d|bc, z|
|  2|abcde| hj f|
 --- ----- ----- 

UDF

def str_intersect(x,y):
    
    res = set(x) & set(y)
    if res:
        return ''.join(res)
    else:
        return None

str_intersect_udf = F.udf(lambda x,y:str_intersect(x,y),StringType())

sparkDF.withColumn('intersect',str_intersect_udf(F.col('colA'),F.col('colB'))).show()

 --- ----- ----- --------- 
| id| colA| colB|intersect|
 --- ----- ----- --------- 
|  1|abc d|bc, z|      bc |
|  2|abcde| hj f|     null|
 --- ----- ----- --------- 

CodePudding user response:

You could split by using regex and then create a UDF function to check substrings.

Example:

spark = SparkSession.builder.getOrCreate()
data = [
    {"id": 1, "A": "abc d", "B": "bc, z, d"},
    {"id": 2, "A": "abc-d", "B": "acb, abc"},
    {"id": 3, "A": "abcde", "B": "hj f ab"},
]
df = spark.createDataFrame(data)
split_regex = "((,)?\s|[-])"
df = df.withColumn("A", F.split(F.col("A"), split_regex))
df = df.withColumn("B", F.split(F.col("B"), split_regex))


def mapper(a, b):
    result = []
    for ele_b in b:
        for ele_a in a:
            if ele_b in ele_a:
                result.append(ele_b)
    return result


df = df.withColumn(
    "result", F.udf(mapper, ArrayType(StringType()))(F.col("A"), F.col("B"))
)

Result:

root
 |-- A: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- B: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
 |-- result: array (nullable = true)
 |    |-- element: string (containsNull = true)

 -------- ----------- --- -------                                               
|A       |B          |id |result |
 -------- ----------- --- ------- 
|[abc, d]|[bc, z, d] |1  |[bc, d]|
|[abc, d]|[acb, abc] |2  |[abc]  |
|[abcde] |[hj, f, ab]|3  |[ab]   |
 -------- ----------- --- ------- 
  • Related