Home > Software engineering >  How to implode multiple columns into one struct in spark
How to implode multiple columns into one struct in spark

Time:05-05

I have a spark dataframe with the following schema:

  • headers
  • key
  • id
  • timestamp
  • metricVal1
  • metricVal2

I want to combine multiple columns into one struct such that the resultant schema becomes:

  • headers (col)
  • key (col)
  • value (struct)
    • id (col)
    • timestamp (col)
    • metricVal1 (col)
    • metricVal2 (col)

I want this into such a format so that it becomes suitable as a kafka input. Please tell how to achieve this.

CodePudding user response:

You can use a struct to group the items.

res26.show()
 ------- ------ ----- ------------ ------------ 
|Account|Amount|order|    meteric1|    meteric2|
 ------- ------ ----- ------------ ------------ 
| 643100| 10000|    1|           0|           0|
| 234100|  4000|    2|  8589934592|  8589934592|
| 124562| 20000|    9| 17179869184| 17179869184|
| 234567|  5000|   10| 17179869185| 17179869185|
| 643304| 40000|    8| 25769803776| 25769803776|
| 124562| 20000|    9| 34359738368| 34359738368|
| 234567|  5000|   10| 34359738369| 34359738369|
| 643304| 40000|    8| 42949672960| 42949672960|
| 643100| 10000|    1| 51539607552| 51539607552|
| 234100|  4000|    2| 60129542144| 60129542144|
| 231300|  1000|    3| 68719476736| 68719476736|
| 136400|  5000|    4| 77309411328| 77309411328|
| 643841| 20000|    5| 77309411329| 77309411329|
| 432176| 10000|    7| 85899345920| 85899345920|
| 562100| 10000|    6| 94489280512| 94489280512|
| 432176| 10000|    7|103079215104|103079215104|
| 562100| 10000|    6|111669149696|111669149696|
| 231300|  1000|    3|120259084288|120259084288|
| 136400|  5000|    4|128849018880|128849018880|
| 643841| 20000|    5|128849018881|128849018881|
 ------- ------ ----- ------------ ------------ 


res26.select( res26("Account"),res26("Amount"), struct( col("order"), col("meteric1"),col("meteric2")).as("Value") ).show(true)
 ------- ------ -------------------- 
|Account|Amount|               Value|
 ------- ------ -------------------- 
| 643100| 10000|           [1, 0, 0]|
| 234100|  4000|[2, 8589934592, 8...|
| 124562| 20000|[9, 17179869184, ...|
| 234567|  5000|[10, 17179869185,...|
| 643304| 40000|[8, 25769803776, ...|
| 124562| 20000|[9, 34359738368, ...|
| 234567|  5000|[10, 34359738369,...|
| 643304| 40000|[8, 42949672960, ...|
| 643100| 10000|[1, 51539607552, ...|
| 234100|  4000|[2, 60129542144, ...|
| 231300|  1000|[3, 68719476736, ...|
| 136400|  5000|[4, 77309411328, ...|
| 643841| 20000|[5, 77309411329, ...|
| 432176| 10000|[7, 85899345920, ...|
| 562100| 10000|[6, 94489280512, ...|
| 432176| 10000|[7, 103079215104,...|
| 562100| 10000|[6, 111669149696,...|
| 231300|  1000|[3, 120259084288,...|
| 136400|  5000|[4, 128849018880,...|
| 643841| 20000|[5, 128849018881,...|
 ------- ------ -------------------- 

CodePudding user response:

In Pyspark: a minimal demo using struct

Create a Spark dataframe from a list of data

data = [('head1', 'id1', 'timestamp1'), ('head2', 'id2', 'timestamp2'), ('head03', 'id3', 'timestamp3')]
df = spark.createDataFrame(data, ['headers', 'id', 'timestamp'])
df.show()
#  ------- --- ----------                                                         
# |headers| id| timestamp|
#  ------- --- ---------- 
# |  head1|id1|timestamp1|
# |  head2|id2|timestamp2|
# | head03|id3|timestamp3|
#  ------- --- ---------- 

# pretty-print dataframe schema  
df.printSchema()
# root
#  |-- headers: string (nullable = true)
#  |-- id: string (nullable = true)
#  |-- timestamp: string (nullable = true)

Use struct to collate multiple columns in a struct under one column

from pyspark.sql.functions import struct
df1 = df.select('headers', struct('id', 'timestamp').alias('value'))
df1.show()
#  ------- ----------------- 
# |headers|            value|
#  ------- ----------------- 
# |  head1|{id1, timestamp1}|
# |  head2|{id2, timestamp2}|
# | head03|{id3, timestamp3}|
#  ------- ----------------- 
df1.printSchema()
# root
#  |-- headers: string (nullable = true)
#  |-- value: struct (nullable = false)
#  |    |-- id: string (nullable = true)
#  |    |-- timestamp: string (nullable = true)
  • Related