Home > OS >  How to dynamically convert dataframe columns to map
How to dynamically convert dataframe columns to map

Time:08-09

How to convert the data to map in PySpark, for dynamic columns?

Input dataframe:

key_column Column_1 Column_2 ..... Column_N
1 Value_1 Value_2 ..... Value_N
1 Value_a Value_2 ...... Value_Z
2 Value_1 Value_2 ..... Value_N

Expected output dataframe:

key_column Map_output
1 {"Column_1":"Value_1, Value_a", "Column_2":"Value_2", ......, "Column_N":"Value_N, Value_Z"}
2 {"Column_1":"Value_1", "Column_2":"Value_2", ......, "Column_N":"Value_N"}

CodePudding user response:

We can use create_map function with reduce().

col_list = ['col1', 'col2', 'col3']  # can use sdf.columns for all columns in dataframe

spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
    toDF(['col1', 'col2', 'col3']). \
    withColumn('allcol_map', 
               func.create_map(*reduce(lambda x, y: x   y, [[func.lit(k), func.col(k)] for k in col_list]))
               ). \
    show(truncate=False)

#  ----- ----- ----- --------------------------------------------- 
# |col1 |col2 |col3 |allcol_map                                   |
#  ----- ----- ----- --------------------------------------------- 
# |val01|val02|val03|{col1 -> val01, col2 -> val02, col3 -> val03}|
# |val11|val12|val13|{col1 -> val11, col2 -> val12, col3 -> val13}|
#  ----- ----- ----- --------------------------------------------- 

# root
#  |-- col1: string (nullable = true)
#  |-- col2: string (nullable = true)
#  |-- col3: string (nullable = true)
#  |-- allcol_map: map (nullable = false)
#  |    |-- key: string
#  |    |-- value: string (valueContainsNull = true)

We can also use map_from_entries function that requires an array of structs. The struct fields will be converted into the maps. It will output the same result as aforementioned.

col_list = ['col1', 'col2', 'col3']  # can use sdf.columns for all columns in dataframe

spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
    toDF(['col1', 'col2', 'col3']). \
    withColumn('allcol_map', 
               func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
               ). \
    show(truncate=False)

Based on the updated situation, you'd like to group by some key columns. Looking at the new expected output, you can use concat_ws and collect_list / collect_set to club the all / unique column values.

col_list = ['col1', 'col2', 'col3']

spark.sparkContext.parallelize([('part0', 'val01', 'val02', 'val03'), ('part0', 'val11', 'val12', 'val13'), ('part1', 'val21', 'val22', 'val23')]). \
    toDF(['key_column', 'col1', 'col2', 'col3']). \
    groupBy('key_column'). \
    agg(*[func.concat_ws(',', func.collect_set(k)).alias(k) for k in col_list]). \
    withColumn('allcol_map', 
               func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
               ). \
    show(truncate=False)

#  ---------- ----------- ----------- ----------- --------------------------------------------------------------- 
# |key_column|col1       |col2       |col3       |allcol_map                                                     |
#  ---------- ----------- ----------- ----------- --------------------------------------------------------------- 
# |part1     |val21      |val22      |val23      |{col1 -> val21, col2 -> val22, col3 -> val23}                  |
# |part0     |val01,val11|val02,val12|val03,val13|{col1 -> val01,val11, col2 -> val02,val12, col3 -> val03,val13}|
#  ---------- ----------- ----------- ----------- --------------------------------------------------------------- 

CodePudding user response:

F.from_json(F.to_json(F.struct(df.columns)), 'map<string,string>')

Example:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('Value_1', 'Value_2', 'Value_N'),
     ('Value_a', 'Value_b', 'Value_M')],
    ['Column_1', 'Column_2', 'Column_N'])

df = df.select(F.from_json(F.to_json(F.struct(df.columns)), 'map<string,string>').alias('Map_output'))

df.show(truncate=0)
#  --------------------------------------------------------------- 
# |Map_output                                                     |
#  --------------------------------------------------------------- 
# |{Column_1 -> Value_1, Column_2 -> Value_2, Column_N -> Value_N}|
# |{Column_1 -> Value_a, Column_2 -> Value_b, Column_N -> Value_M}|
#  --------------------------------------------------------------- 
  • Related