How to convert the data to map in PySpark, for dynamic columns?
Input dataframe:
key_column | Column_1 | Column_2 | ..... | Column_N |
---|---|---|---|---|
1 | Value_1 | Value_2 | ..... | Value_N |
1 | Value_a | Value_2 | ...... | Value_Z |
2 | Value_1 | Value_2 | ..... | Value_N |
Expected output dataframe:
key_column | Map_output |
---|---|
1 | {"Column_1":"Value_1, Value_a", "Column_2":"Value_2", ......, "Column_N":"Value_N, Value_Z"} |
2 | {"Column_1":"Value_1", "Column_2":"Value_2", ......, "Column_N":"Value_N"} |
CodePudding user response:
We can use create_map
function with reduce()
.
col_list = ['col1', 'col2', 'col3'] # can use sdf.columns for all columns in dataframe
spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
toDF(['col1', 'col2', 'col3']). \
withColumn('allcol_map',
func.create_map(*reduce(lambda x, y: x y, [[func.lit(k), func.col(k)] for k in col_list]))
). \
show(truncate=False)
# ----- ----- ----- ---------------------------------------------
# |col1 |col2 |col3 |allcol_map |
# ----- ----- ----- ---------------------------------------------
# |val01|val02|val03|{col1 -> val01, col2 -> val02, col3 -> val03}|
# |val11|val12|val13|{col1 -> val11, col2 -> val12, col3 -> val13}|
# ----- ----- ----- ---------------------------------------------
# root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: string (nullable = true)
# |-- allcol_map: map (nullable = false)
# | |-- key: string
# | |-- value: string (valueContainsNull = true)
We can also use map_from_entries
function that requires an array of structs. The struct fields will be converted into the maps. It will output the same result as aforementioned.
col_list = ['col1', 'col2', 'col3'] # can use sdf.columns for all columns in dataframe
spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
toDF(['col1', 'col2', 'col3']). \
withColumn('allcol_map',
func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
). \
show(truncate=False)
Based on the updated situation, you'd like to group by some key columns. Looking at the new expected output, you can use concat_ws
and collect_list
/ collect_set
to club the all / unique column values.
col_list = ['col1', 'col2', 'col3']
spark.sparkContext.parallelize([('part0', 'val01', 'val02', 'val03'), ('part0', 'val11', 'val12', 'val13'), ('part1', 'val21', 'val22', 'val23')]). \
toDF(['key_column', 'col1', 'col2', 'col3']). \
groupBy('key_column'). \
agg(*[func.concat_ws(',', func.collect_set(k)).alias(k) for k in col_list]). \
withColumn('allcol_map',
func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
). \
show(truncate=False)
# ---------- ----------- ----------- ----------- ---------------------------------------------------------------
# |key_column|col1 |col2 |col3 |allcol_map |
# ---------- ----------- ----------- ----------- ---------------------------------------------------------------
# |part1 |val21 |val22 |val23 |{col1 -> val21, col2 -> val22, col3 -> val23} |
# |part0 |val01,val11|val02,val12|val03,val13|{col1 -> val01,val11, col2 -> val02,val12, col3 -> val03,val13}|
# ---------- ----------- ----------- ----------- ---------------------------------------------------------------
CodePudding user response:
F.from_json(F.to_json(F.struct(df.columns)), 'map<string,string>')
Example:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('Value_1', 'Value_2', 'Value_N'),
('Value_a', 'Value_b', 'Value_M')],
['Column_1', 'Column_2', 'Column_N'])
df = df.select(F.from_json(F.to_json(F.struct(df.columns)), 'map<string,string>').alias('Map_output'))
df.show(truncate=0)
# ---------------------------------------------------------------
# |Map_output |
# ---------------------------------------------------------------
# |{Column_1 -> Value_1, Column_2 -> Value_2, Column_N -> Value_N}|
# |{Column_1 -> Value_a, Column_2 -> Value_b, Column_N -> Value_M}|
# ---------------------------------------------------------------