Home > Mobile >  Finding matching records in two datasets per unique value in another column
Finding matching records in two datasets per unique value in another column

Time:08-19

dataset_a =

zid   code   number
a1    abc    4.568
a2    adc    4.368
a3    asc    4.566
a4    bde    5.568
a5    ghi    7.969
a6    gji    7.475

dataset_b =

col   code   series
55    abc         1
22    adc         1
44    asc         2
11    asv         2
66    bde         3
77    trd         4
88    ghi         5
89    gji         5
90    gpi         5

I want to look at all the codes present in both datasets per series. So the desired output is:

code   series
abc         1
adc         1
bde         3

I did a left-join, and the code asc is also taken into consideration, but since not all codes for series 2 are present in df_1, I just want to ignore it. Same story for series 5.

Can I groupBy and then join?

CodePudding user response:

You can do 2 joins to mark rows which need to be kept:

import pyspark.sql.functions as F

# "exists" is 1 if code is present in dataset_a or 0 otherwise
df = dataset_b.join(
    dataset_a.select("code").distinct().withColumn("exists", F.lit(1)),
    on="code",
    how="left",
).fillna(0)

# "keep" is True only if all rows in the series have "exists" 1
df_keep = df.groupBy("series").agg((F.count("exists") == F.sum("exists")).alias("keep"))

# final join and filtering of rows where "keep" is not True
dataset_b.join(df_keep, on="series", how="left").where(F.col("keep") == True).select(
    "code", "series"
).show()

CodePudding user response:

import pyspark.sql.functions as F

df_a=spark.createDataFrame([["a1","abc",45],["a2",'adc',43],["a3",'asc',43],["a4",'bde',43],["a2",'ghi',43]],["zid","code","number"])

df_b=spark.createDataFrame([["55","abc",1],["22",'adc',1],["44",'asc',2],["11",'asv',2],["66",'bde',3], ["77",'trd',4], ["88",'ghi',5],["89",'gji',5],["90",'gpi',5]],["col","code","series"])

df_join=df_a.join(df_b,df_a.code==df_b.code,"left").select(df_a.code,df_b.series)

df_join_agg=df_join.groupby("series").agg(F.count("series").alias("keep"))

df_b_agg=df_b.groupby("series").agg(F.count("series").alias("keep"))

df_b_join=df_b_agg.join(df_b, on="series", how="left")

df_final=df_b_join.join(df_join_agg,[df_b_join.series==df_join_agg.series , df_b_join.keep==df_join_agg.keep],"inner").drop(df_join_agg.series).drop(df_join_agg.keep)

df_final.select("code","series").show()

  • Related