dataset_a =
zid code number
a1 abc 4.568
a2 adc 4.368
a3 asc 4.566
a4 bde 5.568
a5 ghi 7.969
a6 gji 7.475
dataset_b =
col code series
55 abc 1
22 adc 1
44 asc 2
11 asv 2
66 bde 3
77 trd 4
88 ghi 5
89 gji 5
90 gpi 5
I want to look at all the codes present in both datasets per series. So the desired output is:
code series
abc 1
adc 1
bde 3
I did a left-join, and the code asc
is also taken into consideration, but since not all codes
for series 2
are present in df_1
, I just want to ignore it. Same story for series 5
.
Can I groupBy
and then join
?
CodePudding user response:
You can do 2 joins to mark rows which need to be kept:
import pyspark.sql.functions as F
# "exists" is 1 if code is present in dataset_a or 0 otherwise
df = dataset_b.join(
dataset_a.select("code").distinct().withColumn("exists", F.lit(1)),
on="code",
how="left",
).fillna(0)
# "keep" is True only if all rows in the series have "exists" 1
df_keep = df.groupBy("series").agg((F.count("exists") == F.sum("exists")).alias("keep"))
# final join and filtering of rows where "keep" is not True
dataset_b.join(df_keep, on="series", how="left").where(F.col("keep") == True).select(
"code", "series"
).show()
CodePudding user response:
import pyspark.sql.functions as F
df_a=spark.createDataFrame([["a1","abc",45],["a2",'adc',43],["a3",'asc',43],["a4",'bde',43],["a2",'ghi',43]],["zid","code","number"])
df_b=spark.createDataFrame([["55","abc",1],["22",'adc',1],["44",'asc',2],["11",'asv',2],["66",'bde',3], ["77",'trd',4], ["88",'ghi',5],["89",'gji',5],["90",'gpi',5]],["col","code","series"])
df_join=df_a.join(df_b,df_a.code==df_b.code,"left").select(df_a.code,df_b.series)
df_join_agg=df_join.groupby("series").agg(F.count("series").alias("keep"))
df_b_agg=df_b.groupby("series").agg(F.count("series").alias("keep"))
df_b_join=df_b_agg.join(df_b, on="series", how="left")
df_final=df_b_join.join(df_join_agg,[df_b_join.series==df_join_agg.series , df_b_join.keep==df_join_agg.keep],"inner").drop(df_join_agg.series).drop(df_join_agg.keep)
df_final.select("code","series").show()