I have a pyspark dataframe df :-
STORE | COL_APPLE_BB | COL_APPLE_NONBB | COL_PEAR_BB | COL_PEAR_NONBB | COL_ORANGE_BB | COL_ORANGE_NONBB | COL_GRAPE_BB | COL_GRAPE_NONBB |
---|---|---|---|---|---|---|---|---|
1 | 28 | 24 | 24 | 32 | 26 | 54 | 60 | 36 |
2 | 19 | 12 | 24 | 13 | 10 | 24 | 29 | 10 |
I have another pyspark df df2
:-
STORE | PDT | FRUIT | TYPE |
---|---|---|---|
1 | 1 | APPLE | BB |
1 | 2 | ORANGE | NONBB |
1 | 3 | PEAR | BB |
1 | 4 | GRAPE | BB |
1 | 5 | APPLE | BB |
1 | 6 | ORANGE | BB |
2 | 1 | PEAR | NONBB |
2 | 2 | ORANGE | NONBB |
2 | 3 | APPLE | NONBB |
Expected pyspark df2
with a column COL_VALUE
for repective store,fruit,type:-
STORE | PDT | FRUIT | TYPE | COL_VALUE |
---|---|---|---|---|
1 | 1 | APPLE | BB | 28 |
1 | 2 | ORANGE | NONBB | 54 |
1 | 3 | PEAR | BB | 24 |
1 | 4 | GRAPE | BB | 60 |
1 | 5 | APPLE | BB | 28 |
1 | 6 | ORANGE | BB | 26 |
2 | 1 | PEAR | NONBB | 13 |
2 | 2 | ORANGE | NONBB | 24 |
2 | 3 | APPLE | NONBB | 12 |
CodePudding user response:
from pyspark.sql.functions import *
df = spark.createDataFrame(
[
(1, 28, 24, 24, 32, 26, 54, 60, 36),
(2, 19, 12, 24, 13, 10, 24, 29, 10)
],
["STORE", "COL_APPLE_BB", "COL_APPLE_NONBB", "COL_PEAR_BB", "COL_PEAR_NONBB", "COL_ORANGE_BB", "COL_ORANGE_NONBB", "COL_GRAPE_BB","COL_GRAPE_NONBB"]
)
df2 = spark.createDataFrame(
[
(1, 1, "APPLE", "BB"),
(1, 2, "ORANGE", "NONBB"),
(1, 3, "PEAR", "BB"),
(1, 4, "GRAPE", "BB"),
(1, 5, "APPLE", "BB"),
(1, 6, "ORANGE", "BB"),
(2, 1, "PEAR", "NONBB"),
(2, 2, "ORANGE", "NONBB"),
(2, 3, "APPLE", "NONBB")
],
["STORE", "PDT", "FRUIT", "TYPE"]
)
unPivot_df = df.select("STORE",expr("stack(8, 'APPLE_BB',COL_APPLE_BB,\
'APPLE_NONBB',COL_APPLE_NONBB,\
'PEAR_BB', COL_PEAR_BB,\
'PEAR_NONBB', COL_PEAR_NONBB,\
'ORANGE_BB',COL_ORANGE_BB, \
'ORANGE_NONBB',COL_ORANGE_NONBB,\
'GRAPE_BB',COL_GRAPE_BB,\
'GRAPE_NONBB',COL_GRAPE_NONBB) as (Appended,COL_VALUE)"))
df2 = df2.withColumn("Appended",concat_ws('_',col("FRUIT"),col("TYPE")))
df2 = df2.join(unPivot_df,['STORE',"Appended"],"left")
df2.show()
----- ------------ --- ------ ----- ---------
|STORE| Appended|PDT| FRUIT| TYPE|COL_VALUE|
----- ------------ --- ------ ----- ---------
| 1|ORANGE_NONBB| 2|ORANGE|NONBB| 54|
| 1| PEAR_BB| 3| PEAR| BB| 24|
| 1| GRAPE_BB| 4| GRAPE| BB| 60|
| 1| APPLE_BB| 1| APPLE| BB| 28|
| 2|ORANGE_NONBB| 2|ORANGE|NONBB| 24|
| 2| APPLE_NONBB| 3| APPLE|NONBB| 12|
| 1| ORANGE_BB| 6|ORANGE| BB| 26|
| 1| APPLE_BB| 5| APPLE| BB| 28|
| 2| PEAR_NONBB| 1| PEAR|NONBB| 13|
----- ------------ --- ------ ----- ---------
CodePudding user response:
If you have Spark 3.2 or higher you could use something like:
data = data.melt(
id_vars=['STORE'],
value_vars=data.columns[1:],
var_name="variable",
value_name="value"
)
to get a "long" form of the dataset, and then use regex_extract
twice to get the required information from the variable
column.
For earlier versions of Spark, use the following:
def process_row(row):
output = []
for index, key in enumerate(row.asDict()):
if key == "STORE":
store = row[key]
else:
_, fruit, type = key.split("_")
output.append((store, index, fruit, type, row[key]))
return output
data = data.rdd.flatMap(process_row).toDF(
schema=["STORE", "PDT", "FRUIT", "TYPE", "COLUMN_VALUE"]
)
CodePudding user response:
Alternatively to melt
, you can use stack
in earlier Spark versions:
df = spark.createDataFrame(
[
(1, 28, 24),
(2, 19, 12),
],
["STORE", "COL_APPLE_BB", "COL_APPLE_NONBB"]
)
df2 = spark.createDataFrame(
[
(1, 1, "APPLE", "BB"),
(1, 2, "ORANGE", "NONBB"),
(1, 2, "APPLE", "NONBB"),
(2, 3, "APPLE", "NONBB")
],
["STORE", "PDT", "FRUIT", "TYPE"]
)
Create a column that matches the "COL_FRUIT_TYPE" in df
:
df3 = df2.withColumn("fruit_type", F.concat(F.lit("COL_"), F.col("FRUIT"), F.lit("_"), F.col("TYPE")))
df3.show(10, False)
gives:
----- --- ------ ----- ----------------
|STORE|PDT|FRUIT |TYPE |fruit_type |
----- --- ------ ----- ----------------
|1 |1 |APPLE |BB |COL_APPLE_BB |
|1 |2 |ORANGE|NONBB|COL_ORANGE_NONBB|
|1 |2 |APPLE |NONBB|COL_APPLE_NONBB |
----- --- ------ ----- ----------------
Then "unpivot" the first df
:
from pyspark.sql.functions import expr
unpivotExpr = "stack({}, {}) as (fruit_type, COL_VALUE)".format(len(df.columns) - 1, ','.join( [("'{}', {}".format(c, str(c))) for c in df.columns[1:]] ) )
print(unpivotExpr)
unPivotDF = df.select("STORE", expr(unpivotExpr)) \
.where("STORE is not null")
unPivotDF.show(truncate=False)
The stack
function takes as arguments: the number of "columns" that it will be "unpivoting" (here, it derives that it will be len(df.columns) - 1
, as we will be skipping the STORE column); then, in case of just column, value pairs, it takes a list of these in the form col_name, value. Here, the
[("'{}', {}".format(c, str(c))) for c in df.columns[1:]]
part takes columns from df, skipping the first one (STORE), then returns a pair for each of the remaining columns, such as 'COL_APPLE_BB', COL_APPLE_BB. In the end I join these into a comma-separated string (",".join()
) and replace the placeholder {}
with this string.
Example how stack
function is usually called:
"stack(2, 'COL_APPLE_BB', COL_APPLE_BB, 'COL_APPLE_NONBB', COL_APPLE_NONBB) as (fruit_type, COL_VALUE)"
The unPivotDF.show(truncate=False)
outputs:
----- --------------- ---------
|STORE|fruit_type |COL_VALUE|
----- --------------- ---------
|1 |COL_APPLE_BB |28 |
|1 |COL_APPLE_NONBB|24 |
|2 |COL_APPLE_BB |19 |
|2 |COL_APPLE_NONBB|12 |
----- --------------- ---------
and join the two:
df3.join(unPivotDF, ["fruit_type", "STORE"], "left")\
.select("STORE", "PDT", "FRUIT", "TYPE", "COL_VALUE").show(40, False)
result:
----- --- ------ ----- ---------
|STORE|PDT|FRUIT |TYPE |COL_VALUE|
----- --- ------ ----- ---------
|1 |2 |ORANGE|NONBB|null |
|1 |2 |APPLE |NONBB|24 |
|1 |1 |APPLE |BB |28 |
|2 |3 |APPLE |NONBB|12 |
----- --- ------ ----- ---------
The drawback is that you need to enumerate the column names in stack
, if I figure out a way to do this automatically, I will update the answer.
EDIT: I have updated the use of the stack
function, so that it can derive the columns by itself.