I would like to join two pyspark dataframes based on multiple columns.
tab1:
id name(string , size=3) val. (Long int)
6725 fnc 5219
8576 fnc 829
9192 sct 72912
782 sct 1022
tab2:
name (string, size=6). Val. (Array of long int)
fnceda [11, 25, 5219]
fncytfd [71, 829, 320]
sctvbd [357, 72912, 508]
sctgsd [796, 52, 67]
I need to get a new table such that
the “name” in “tab1” match the first 3 letter in “name” of “tab2”
and also the “val” in “tab1” appear in the “val” of “tab2”.
All other rows that do not satisfy the condition need to be removed.
id name(string , size=3) val. (Long int)
6725 fnc 5219
8576 fnc 829
9192 sct 72912
My code:
tab1.join(tab2,
tab1[‘’name”]==F.substring(tab2[“name”], 1, 3),
& F.array_contains(tab2[“val”], tab1[“val”]),
“inner”
)
Got error:
Column is not iterable
It seems that an array column cannot be used as a join condition?
Thanks
CodePudding user response:
This can be accomplished in 3 steps.
Step 1: Create a new column in tab2 with by obtaining substring
from pyspark.sql.functions import substring, explode
tab2_df = tab2_df.withColumn('new_name', substring('name', 0, 3))
Step 2: Explode tab2.val so you have long values instead of array of long.
tab2_df = tab2_df.withColumn('value', explode('Val))
Step 3: Perform a join between tab1 and tab2 by comparing name w/ new_name, and val w/ value
tab3_df = tab1_df.join(tab2_df, [(tab1_df.name == tab2_df.new_name) & (tab1_df.val == tab2_df.value)], how="inner")
display(tab3_df)
CodePudding user response:
You need to wrap your first condition in parentheses, then you'll be fine
df.join(df2, (df['name']==F.substring(df2['name'], 1, 3)) & F.array_contains(df2['val'], df['val']), 'inner').show()
---- ---- ----- ------- -----------------
| id|name| val| name| val|
---- ---- ----- ------- -----------------
|6725| fnc| 5219| fnceda| [11, 25, 5219]|
|8576| fnc| 829|fncytfd| [71, 829, 320]|
|9192| sct|72912| sctvbd|[357, 72912, 508]|
---- ---- ----- ------- -----------------