Home > OS >  How do I create pivot this way in Pyspark?
How do I create pivot this way in Pyspark?

Time:02-05

I have a pyspark dataframe df :-

STORE COL_APPLE_BB COL_APPLE_NONBB COL_PEAR_BB COL_PEAR_NONBB COL_ORANGE_BB COL_ORANGE_NONBB COL_GRAPE_BB COL_GRAPE_NONBB
1 28 24 24 32 26 54 60 36
2 19 12 24 13 10 24 29 10

I have another pyspark df df2 :-

STORE PDT FRUIT TYPE
1 1 APPLE BB
1 2 ORANGE NONBB
1 3 PEAR BB
1 4 GRAPE BB
1 5 APPLE BB
1 6 ORANGE BB
2 1 PEAR NONBB
2 2 ORANGE NONBB
2 3 APPLE NONBB

Expected pyspark df2 with a column COL_VALUE for repective store,fruit,type:-

STORE PDT FRUIT TYPE COL_VALUE
1 1 APPLE BB 28
1 2 ORANGE NONBB 54
1 3 PEAR BB 24
1 4 GRAPE BB 60
1 5 APPLE BB 28
1 6 ORANGE BB 26
2 1 PEAR NONBB 13
2 2 ORANGE NONBB 24
2 3 APPLE NONBB 12

CodePudding user response:


from pyspark.sql.functions import *

df = spark.createDataFrame(
    [
        (1, 28, 24, 24, 32, 26, 54, 60, 36),
(2, 19, 12, 24, 13, 10, 24, 29, 10)
    ],
    ["STORE",   "COL_APPLE_BB", "COL_APPLE_NONBB",  "COL_PEAR_BB",  "COL_PEAR_NONBB",   "COL_ORANGE_BB",    "COL_ORANGE_NONBB", "COL_GRAPE_BB","COL_GRAPE_NONBB"]
)


df2 = spark.createDataFrame(
    [
        (1, 1,  "APPLE",    "BB"),
        (1, 2,  "ORANGE",   "NONBB"),
        (1, 3,  "PEAR", "BB"),
        (1, 4, "GRAPE", "BB"),
        (1, 5,  "APPLE",    "BB"),
        (1, 6,  "ORANGE",   "BB"),
        (2, 1,  "PEAR", "NONBB"),
        (2, 2, "ORANGE",    "NONBB"),
        (2, 3,  "APPLE",    "NONBB")
    ],
    ["STORE", "PDT", "FRUIT", "TYPE"]
)

unPivot_df = df.select("STORE",expr("stack(8, 'APPLE_BB',COL_APPLE_BB,\
                                                         'APPLE_NONBB',COL_APPLE_NONBB,\
                                                         'PEAR_BB', COL_PEAR_BB,\
                                                         'PEAR_NONBB', COL_PEAR_NONBB,\
                                                         'ORANGE_BB',COL_ORANGE_BB, \
                                                         'ORANGE_NONBB',COL_ORANGE_NONBB,\
                                                         'GRAPE_BB',COL_GRAPE_BB,\
                                                         'GRAPE_NONBB',COL_GRAPE_NONBB) as (Appended,COL_VALUE)"))  
    
df2 = df2.withColumn("Appended",concat_ws('_',col("FRUIT"),col("TYPE")))    
df2 = df2.join(unPivot_df,['STORE',"Appended"],"left")
df2.show()

 ----- ------------ --- ------ ----- --------- 
|STORE|    Appended|PDT| FRUIT| TYPE|COL_VALUE|
 ----- ------------ --- ------ ----- --------- 
|    1|ORANGE_NONBB|  2|ORANGE|NONBB|       54|
|    1|     PEAR_BB|  3|  PEAR|   BB|       24|
|    1|    GRAPE_BB|  4| GRAPE|   BB|       60|
|    1|    APPLE_BB|  1| APPLE|   BB|       28|
|    2|ORANGE_NONBB|  2|ORANGE|NONBB|       24|
|    2| APPLE_NONBB|  3| APPLE|NONBB|       12|
|    1|   ORANGE_BB|  6|ORANGE|   BB|       26|
|    1|    APPLE_BB|  5| APPLE|   BB|       28|
|    2|  PEAR_NONBB|  1|  PEAR|NONBB|       13|
 ----- ------------ --- ------ ----- --------- 

CodePudding user response:

If you have Spark 3.2 or higher you could use something like:

data = data.melt(
    id_vars=['STORE'],
    value_vars=data.columns[1:], 
    var_name="variable", 
    value_name="value"
)

to get a "long" form of the dataset, and then use regex_extract twice to get the required information from the variable column.

For earlier versions of Spark, use the following:

def process_row(row):
    output = []
    for index, key in enumerate(row.asDict()):
        if key == "STORE":
            store = row[key]
        else:
            _, fruit, type = key.split("_")
            output.append((store, index, fruit, type, row[key]))
    return output


data = data.rdd.flatMap(process_row).toDF(
    schema=["STORE", "PDT", "FRUIT", "TYPE", "COLUMN_VALUE"]
)

CodePudding user response:

Alternatively to melt, you can use stack in earlier Spark versions:

df = spark.createDataFrame(
    [
        (1, 28, 24),
        (2, 19, 12),
    ],
    ["STORE", "COL_APPLE_BB", "COL_APPLE_NONBB"]
)

df2 = spark.createDataFrame(
    [
        (1, 1, "APPLE", "BB"),
        (1, 2, "ORANGE", "NONBB"),
        (1, 2, "APPLE", "NONBB"),
        (2, 3, "APPLE", "NONBB")
    ],
    ["STORE", "PDT", "FRUIT", "TYPE"]
)

Create a column that matches the "COL_FRUIT_TYPE" in df:

df3 = df2.withColumn("fruit_type", F.concat(F.lit("COL_"), F.col("FRUIT"), F.lit("_"), F.col("TYPE")))
df3.show(10, False)

gives:

 ----- --- ------ ----- ---------------- 
|STORE|PDT|FRUIT |TYPE |fruit_type      |
 ----- --- ------ ----- ---------------- 
|1    |1  |APPLE |BB   |COL_APPLE_BB    |
|1    |2  |ORANGE|NONBB|COL_ORANGE_NONBB|
|1    |2  |APPLE |NONBB|COL_APPLE_NONBB |
 ----- --- ------ ----- ---------------- 

Then "unpivot" the first df:

from pyspark.sql.functions import expr

unpivotExpr = "stack({}, {}) as (fruit_type, COL_VALUE)".format(len(df.columns) - 1, ','.join(  [("'{}', {}".format(c, str(c))) for c in df.columns[1:]] ) )
print(unpivotExpr)
unPivotDF = df.select("STORE", expr(unpivotExpr)) \
              .where("STORE is not null")

unPivotDF.show(truncate=False)

The stack function takes as arguments: the number of "columns" that it will be "unpivoting" (here, it derives that it will be len(df.columns) - 1, as we will be skipping the STORE column); then, in case of just column, value pairs, it takes a list of these in the form col_name, value. Here, the [("'{}', {}".format(c, str(c))) for c in df.columns[1:]] part takes columns from df, skipping the first one (STORE), then returns a pair for each of the remaining columns, such as 'COL_APPLE_BB', COL_APPLE_BB. In the end I join these into a comma-separated string (",".join()) and replace the placeholder {} with this string. Example how stack function is usually called: "stack(2, 'COL_APPLE_BB', COL_APPLE_BB, 'COL_APPLE_NONBB', COL_APPLE_NONBB) as (fruit_type, COL_VALUE)"

The unPivotDF.show(truncate=False) outputs:

 ----- --------------- --------- 
|STORE|fruit_type     |COL_VALUE|
 ----- --------------- --------- 
|1    |COL_APPLE_BB   |28       |
|1    |COL_APPLE_NONBB|24       |
|2    |COL_APPLE_BB   |19       |
|2    |COL_APPLE_NONBB|12       |
 ----- --------------- --------- 

and join the two:

df3.join(unPivotDF, ["fruit_type", "STORE"], "left")\
   .select("STORE", "PDT", "FRUIT", "TYPE", "COL_VALUE").show(40, False)

result:

 ----- --- ------ ----- --------- 
|STORE|PDT|FRUIT |TYPE |COL_VALUE|
 ----- --- ------ ----- --------- 
|1    |2  |ORANGE|NONBB|null     |
|1    |2  |APPLE |NONBB|24       |
|1    |1  |APPLE |BB   |28       |
|2    |3  |APPLE |NONBB|12       |
 ----- --- ------ ----- --------- 

The drawback is that you need to enumerate the column names in stack, if I figure out a way to do this automatically, I will update the answer.

EDIT: I have updated the use of the stack function, so that it can derive the columns by itself.

  • Related