Imagine I have the following table:
unique_id | column_A | column_B |
---|---|---|
123 | 12345 | ABCDEFG |
123 | 23456 | BCDEFGH |
123 | 34567 | CDEFGHI |
234 | 12345 | ABCDEFG |
The amount of rows per unique ID is maximum 3. The result I want to achieve is the following
unique_id | column_A_1 | column_A_2 | column_A_3 | column_B_1 | column_B_2 | column_B_3 |
---|---|---|---|---|---|---|
123 | 12345 | 23456 | 34567 | ABCDEFG | BCDEFGH | CDEFGHI |
234 | 12345 | ABCDEFG |
CodePudding user response:
you can assign a row_number to each record and pivot that.
Here's an example of retaining 2 values per id
using your input dataframe.
pivoted_sdf = data_sdf. \
withColumn('rn',
func.row_number().over(wd.partitionBy('unique_id').orderBy(func.rand()))
). \
filter(func.col('rn') <= 2). \
groupBy('unique_id'). \
pivot('rn', values=['1', '2']). \
agg(func.first('col_a').alias('col_a'),
func.first('col_b').alias('col_b')
)
# --------- ------- ------- ------- -------
# |unique_id|1_col_a|1_col_b|2_col_a|2_col_b|
# --------- ------- ------- ------- -------
# | 234| 12345|ABCDEFG| null| null|
# | 123| 34567|CDEFGHI| 23456|BCDEFGH|
# --------- ------- ------- ------- -------
Notice the column names - spark added the row number as a prefix to the aggregation alias. You can rename the columns to make that as a suffix.
def renameTheCol(column):
col_split = column.split('_')
col_split_rearr = col_split[1:] [col_split[0]]
new_column = '_'.join(col_split_rearr)
return new_column
pivoted_sdf. \
select('unique_id',
*[func.col(k).alias(renameTheCol(k)) for k in pivoted_sdf.columns if k != 'unique_id']
). \
show()
# --------- ------- ------- ------- -------
# |unique_id|col_a_1|col_b_1|col_a_2|col_b_2|
# --------- ------- ------- ------- -------
# | 234| 12345|ABCDEFG| null| null|
# | 123| 23456|BCDEFGH| 34567|CDEFGHI|
# --------- ------- ------- ------- -------