I want new column DATE1
equal to a column START
in dataframe1(DF1) on KEY1
and combine with Dataframe2 (DF2) based on KEY2
in DF2 so it shows DATE1 just when the key mayches in the join. I can show column start but it shows all.
I want DATE2 equal column START
in dataframe1(DF1) on KEY1
but combine with DF2 based on a diffrent key called KEY3
in DF2 so it shows DATE2
just when the key matches in the join. I can show column start but not sure how to only show colum start when combined on two keys.
Example input for DF1 would be:
--------- -------- ------ ------
|START |KEY1 |Color OTHER |
--------- -------- ------ ------
| 10/05/21| 1 | White| 3000|
| 10/06/21| 2 | Blue| 4100|
| 10/07/21| 3 | Green| 6200|
--------- -------- ------ ------
DF2 input would be:
--------- -------- ----
|KEY2 |KEY3 |NUMBER|
--------- -------- ----
| 1 | 2| 3000 |
| 2 | 3| 4100 |
| 3 | 1| 6200 |
--------- -------- ----
Output would be something like below:
--------- --------
|DATE1 | DATE2 |
--------- --------
| 10/05/21|10/06/21|
| 10/06/21|10/07/21|
| 10/07/21|10/05/21|
--------- --------
Below is code
def transform_df_data(df: DataFrame):
return \
df \
.withColumn("DATE1", col("START")) \
.withColumn("DATE2", col("START")) \
.withColumn("KEY1", col("KEY1")) \
.select("KEY1","DATE1","DATE2")
def build_final_df(df:DataFrame, otherdf:Dataframe)
df_transform = transform_df_data(d_period)
return final_transform.join(df_transform , final_transform.KEY1 == df_transform(KEY2, 'inner').withColumn("DATE1", col("START")).select("DATE1","DATE2")
CodePudding user response:
Note sure I correctly understand the question, but I think you want to join df1
and df2
on KEY1 = KEY2
then join the result again with df1
on KEY1 = KEY3
:
import pyspark.sql.functions as F
data1 = [("10/05/21", 1, "White", 3000), ("10/06/21", 2, "Blue", 4100), ("10/07/21", 3, "Green", 6200)]
df1 = spark.createDataFrame(data1, ["START", "KEY1", "Color", "OTHER"])
data2 = [(1, 2, 3000), (2, 3, 4100), (3, 1, 6200)]
df2 = spark.createDataFrame(data2, ["KEY2", "KEY3", "NUMBER"])
df_result = df1.withColumnRenamed("START", "DATE1").join(
df2,
F.col("KEY1") == F.col("KEY2")
).select("DATE1", "KEY3").join(
df1.withColumnRenamed("START", "DATE2"),
F.col("KEY1") == F.col("KEY3")
).select("DATE1", "DATE2")
df_result.show()
# -------- --------
#| DATE1| DATE2|
# -------- --------
#|10/07/21|10/05/21|
#|10/05/21|10/06/21|
#|10/06/21|10/07/21|
# -------- --------