I have a dataframe in pyspark which looks like
CustomerID counts
3004000304 {'MAJOR APPLIANCES, Home Office' -> 2, 'HOME OFFICE, ACCESSORIES' -> 1, 'OTHER STUFF' -> 2}
3004002756 {'MAJOR APPLIANCES, ACCESSORIES' -> 2}
3004000304 {'ACCESSORIES, OTHER STUFF' -> 2}
I want something like this
CustomerID counts
3004000304 {'MAJOR APPLIANCES' -> 2, 'Home Office' -> 2, 'HOME OFFICE' -> 1, 'ACCESSORIES' -> 3, 'OTHER STUFF':4}
3004002756 {'MAJOR APPLIANCES' -> 2, 'ACCESSORIES' -> 2}
Basically, I want the values of my dictionary/Map column to be split and assigned to each of the keys.
Either Pyspark or pandas.
For pandas my df looks like this
CustomerID counts
3004000304 {'MAJOR APPLIANCES, Home Office' : 2, 'HOME OFFICE, ACCESSORIES' : 1, 'OTHER STUFF' : 2}
3004002756 {'MAJOR APPLIANCES, ACCESSORIES' : 2}
MY APPROACH I converted the PYSPARK DF to Pandas Df as I don't have much experience with Pyspark and new to it, A direct pyspark help will be much appreciated, else even pandas implementation would be great.
f = lambda x: dict(zip(x['Description'], x['Counts']))
df = categories.groupby(level=0).apply(f).to_frame('Counts')
print (df)
df.groupBy("CustomerID").agg(
F.map_from_entries(
F.collect_list(
F.struct("Description", "Counts"))).alias("Description_Counts")
).show(truncate=False)
The code is not splitting the keys and assigning values.
Edit
{'Customer ID': {8548: '3004000304', 8549: '3004002756'},
'Desc': {8548: {'MAJOR APPLIANCES': 2, 'Home Office' :2, 'ACCESSORIES' : 1, 'OTHER STUFF':2}, 8549: {'MAJOR APPLIANCES': 2, 'ACCESSORIES' :2}}}
CodePudding user response:
Using only pyspark:
import pyspark.sql.functions as f
df = (
df
.select('CustomerID', f.explode(f.col('counts')))
.withColumn('key_separated', f.explode(f.split(f.col('key'), ',')))
.withColumn('key_separated_trimmed', f.trim(f.col('key_separated')))
.groupBy('CustomerId', 'key_separated_trimmed')
.agg(f.sum(f.col('value')).alias('value'))
.groupBy('CustomerId')
.agg(f.collect_list(f.col('key_separated_trimmed')).alias('keys'), f.collect_list(f.col('value')).alias('values'))
.withColumn('counts', f.map_from_arrays(f.col('keys'), f.col('values')))
.drop('keys', 'values')
)
CodePudding user response:
With the given dictionary:
df1 = df.Desc.apply(pd.Series).stack().reset_index(1)
df1['level_1'] = df1.level_1.str.split(', *', regex = True)
df1 = df1.explode('level_1').rename({0:'counts'}, axis=1)
df.join(df1.groupby(df1.index).apply(lambda x:pd.Series(x.set_index('level_1').to_dict())))
Customer ID ... counts
8548 3004000304 ... {'MAJOR APPLIANCES': 2.0, 'Home Office': 2.0, ...
8549 3004002756 ... {'MAJOR APPLIANCES': 2.0, 'ACCESSORIES': 2.0}
[2 rows x 3 columns]
where
df = pd.DataFrame({'Customer ID': {8548: '3004000304', 8549: '3004002756'},
'Desc': {8548: {'MAJOR APPLIANCES': 2, 'Home Office' :2, 'ACCESSORIES' : 1, 'OTHER STUFF':2}, 8549: {'MAJOR APPLIANCES': 2, 'ACCESSORIES' :2}}})