I have a spark dataframe with the following schema:
- headers
- key
- id
- timestamp
- metricVal1
- metricVal2
I want to combine multiple columns into one struct such that the resultant schema becomes:
- headers (col)
- key (col)
- value (struct)
- id (col)
- timestamp (col)
- metricVal1 (col)
- metricVal2 (col)
I want this into such a format so that it becomes suitable as a kafka input. Please tell how to achieve this.
CodePudding user response:
You can use a struct
to group the items.
res26.show()
------- ------ ----- ------------ ------------
|Account|Amount|order| meteric1| meteric2|
------- ------ ----- ------------ ------------
| 643100| 10000| 1| 0| 0|
| 234100| 4000| 2| 8589934592| 8589934592|
| 124562| 20000| 9| 17179869184| 17179869184|
| 234567| 5000| 10| 17179869185| 17179869185|
| 643304| 40000| 8| 25769803776| 25769803776|
| 124562| 20000| 9| 34359738368| 34359738368|
| 234567| 5000| 10| 34359738369| 34359738369|
| 643304| 40000| 8| 42949672960| 42949672960|
| 643100| 10000| 1| 51539607552| 51539607552|
| 234100| 4000| 2| 60129542144| 60129542144|
| 231300| 1000| 3| 68719476736| 68719476736|
| 136400| 5000| 4| 77309411328| 77309411328|
| 643841| 20000| 5| 77309411329| 77309411329|
| 432176| 10000| 7| 85899345920| 85899345920|
| 562100| 10000| 6| 94489280512| 94489280512|
| 432176| 10000| 7|103079215104|103079215104|
| 562100| 10000| 6|111669149696|111669149696|
| 231300| 1000| 3|120259084288|120259084288|
| 136400| 5000| 4|128849018880|128849018880|
| 643841| 20000| 5|128849018881|128849018881|
------- ------ ----- ------------ ------------
res26.select( res26("Account"),res26("Amount"), struct( col("order"), col("meteric1"),col("meteric2")).as("Value") ).show(true)
------- ------ --------------------
|Account|Amount| Value|
------- ------ --------------------
| 643100| 10000| [1, 0, 0]|
| 234100| 4000|[2, 8589934592, 8...|
| 124562| 20000|[9, 17179869184, ...|
| 234567| 5000|[10, 17179869185,...|
| 643304| 40000|[8, 25769803776, ...|
| 124562| 20000|[9, 34359738368, ...|
| 234567| 5000|[10, 34359738369,...|
| 643304| 40000|[8, 42949672960, ...|
| 643100| 10000|[1, 51539607552, ...|
| 234100| 4000|[2, 60129542144, ...|
| 231300| 1000|[3, 68719476736, ...|
| 136400| 5000|[4, 77309411328, ...|
| 643841| 20000|[5, 77309411329, ...|
| 432176| 10000|[7, 85899345920, ...|
| 562100| 10000|[6, 94489280512, ...|
| 432176| 10000|[7, 103079215104,...|
| 562100| 10000|[6, 111669149696,...|
| 231300| 1000|[3, 120259084288,...|
| 136400| 5000|[4, 128849018880,...|
| 643841| 20000|[5, 128849018881,...|
------- ------ --------------------
CodePudding user response:
In Pyspark: a minimal demo using struct
Create a Spark dataframe from a list of data
data = [('head1', 'id1', 'timestamp1'), ('head2', 'id2', 'timestamp2'), ('head03', 'id3', 'timestamp3')]
df = spark.createDataFrame(data, ['headers', 'id', 'timestamp'])
df.show()
# ------- --- ----------
# |headers| id| timestamp|
# ------- --- ----------
# | head1|id1|timestamp1|
# | head2|id2|timestamp2|
# | head03|id3|timestamp3|
# ------- --- ----------
# pretty-print dataframe schema
df.printSchema()
# root
# |-- headers: string (nullable = true)
# |-- id: string (nullable = true)
# |-- timestamp: string (nullable = true)
Use struct
to collate multiple columns in a struct under one column
from pyspark.sql.functions import struct
df1 = df.select('headers', struct('id', 'timestamp').alias('value'))
df1.show()
# ------- -----------------
# |headers| value|
# ------- -----------------
# | head1|{id1, timestamp1}|
# | head2|{id2, timestamp2}|
# | head03|{id3, timestamp3}|
# ------- -----------------
df1.printSchema()
# root
# |-- headers: string (nullable = true)
# |-- value: struct (nullable = false)
# | |-- id: string (nullable = true)
# | |-- timestamp: string (nullable = true)