Let's say I have this df:
from pyspark.sql import functions as F
from pyspark.sql.types import *
data2 = [("James","","Smith","36636","M",3000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df1 = spark.createDataFrame(data=data2,schema=schema)
df1.show(truncate=False)
--------- ---------- -------- ----- ------ ------
|firstname|middlename|lastname|id |gender|salary|
--------- ---------- -------- ----- ------ ------
|James | |Smith |36636|M |3000 |
|Robert | |Williams|42114|M |4000 |
|Maria |Anne |Jones |39192|F |4000 |
--------- ---------- -------- ----- ------ ------
Then I have this df which is a metadata table with key fields in df1:
df2 = spark.createDataFrame(
[
(["firstname", "lastname"],["001", "002"]) ],
["col_lst", "id_lst"]
)
df2.show(truncate=False)
--------------------- ----------
|col_lst |id_lst |
--------------------- ----------
|[firstname, lastname]|[001, 002]|
--------------------- ----------
I want to add a column to df1 one taking a combination of (id, value)
from df2 in a JSON structure. Desired output:
--------- ---------- -------- ----- ------ ------ --------------------------------------------------------------------------------
|firstname|middlename|lastname|id |gender|salary|JSON |
--------- ---------- -------- ----- ------ ------ --------------------------------------------------------------------------------
|James | |Smith |36636|M |3000 |{"business_key":[{"id":"001","value":"James"},{"id":"002","value":"Smith"}]} |
|Robert | |Williams|42114|M |4000 |{"business_key":[{"id":"001","value":"Robert"},{"id":"002","value":"Williams"}]}|
|Maria |Anne |Jones |39192|F |4000 |{"business_key":[{"id":"001","value":"Maria"},{"id":"002","value":"Jones"}]} |
--------- ---------- -------- ----- ------ ------ --------------------------------------------------------------------------------
CodePudding user response:
You can create an array of structs from df1.columns
then filter it using names from df2.col_lst
and zip the result with id_lst
. If you have few rows in df2
it would simpler to collect them into variables and use it as literal expressions without having to cross join:
df3 = df1.crossJoin(df2).withColumn(
"info",
F.array(*[F.struct(F.lit(c).alias("k"), F.col(c).alias("v")) for c in df1.columns])
).withColumn(
"JSON",
F.to_json(
F.struct(
F.arrays_zip(
F.filter("info", lambda x: F.array_contains("col_lst", x["k"])).k.alias("value"),
F.col("id_lst").alias("id")
).alias("business_key")
)
)
).select(df1["*"], "JSON")
df3.show(truncate=False)
# --------- ---------- -------- ----- ------ ------ -----------------------------------------------------------------------------------
# |firstname|middlename|lastname|id |gender|salary|JSON |
# --------- ---------- -------- ----- ------ ------ -----------------------------------------------------------------------------------
# |James | |Smith |36636|M |3000 |{"business_key":[{"value":"firstname","id":"001"},{"value":"lastname","id":"002"}]}|
# |Robert | |Williams|42114|M |4000 |{"business_key":[{"value":"firstname","id":"001"},{"value":"lastname","id":"002"}]}|
# |Maria |Anne |Jones |39192|F |4000 |{"business_key":[{"value":"firstname","id":"001"},{"value":"lastname","id":"002"}]}|
# --------- ---------- -------- ----- ------ ------ -----------------------------------------------------------------------------------