I have a PySpark dataframe as shown below. I need to collapse the dataframe rows into a Python dictionary
containing column:value
pairs. Finally, convert the dictionary into a Python list of tuples
as shown below. I am using Spark 2.4.
DataFrame:
>>> myDF.show()
------ --- --------- ----------
|fname |age|location | dob |
------ --- --------- ----------
| John| 45| USA|1985/01/05|
| David| 33| England|2003/05/19|
|Travis| 56| Japan|1976/08/12|
| Tim| 75|Australia|2005/12/18|
| Harry| 35| France|1980/10/16|
------ --- --------- ----------
>>> table_nm = 'emp_dtl'
>>> col_list = ['age', 'location'] # from the DF
Expected output:
[('emp_dtl', 'age; location', 'fname:"John"; age:45; location:"USA"; dob:"1985/01/05"'),
('emp_dtl', 'age; location', 'fname:"David"; age:33; location:"UK"; dob:"2003/05/19"'),
('emp_dtl', 'age; location', 'fname:"Travis"; age:56; location:"Japan"; dob:"1976/08/12"'),
('emp_dtl', 'age; location', 'fname:"Tim"; age:75; location:"Australia"; dob:"2005/12/18"'),
('emp_dtl', 'age; location', 'fname:"Harry"; age:35; location:"France"; dob:"1980/10/16"')]
CodePudding user response:
Send it to JSON
df.withColumn('table_nm',lit('emp_dtl')).toJSON().collect()
CodePudding user response:
For some reason I was unable to use collect()
in Spark 2.4. So here are two options, close to what you wanted.
Inputs:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('John', 45, 'USA', '1985/01/05'),
('David', 33, 'England', '2003/05/19'),
('Travis', 56, 'Japan', '1976/08/12'),
('Tim', 75, 'Australia', '2005/12/18'),
('Harry', 35, 'France', '1980/10/16')],
['fname', 'age', 'location', 'dob'])
table_nm = 'emp_dtl'
col_list = ['age', 'location']
Script:
df = df.select([F.to_json(F.struct(c)).alias(c) for c in df.columns])
df = df.select(F.array_join(F.array([F.translate(c, '{}', '') for c in df.columns]), '; ').alias('a'))
result = [(table_nm, '; '.join(col_list), r.a) for r in df.collect()]
# [('emp_dtl',
# 'age; location',
# '"fname":"John"; "age":45; "location":"USA"; "dob":"1985/01/05"'),
# ('emp_dtl',
# 'age; location',
# '"fname":"David"; "age":33; "location":"England"; "dob":"2003/05/19"'),
# ('emp_dtl',
# 'age; location',
# '"fname":"Travis"; "age":56; "location":"Japan"; "dob":"1976/08/12"'),
# ('emp_dtl',
# 'age; location',
# '"fname":"Tim"; "age":75; "location":"Australia"; "dob":"2005/12/18"'),
# ('emp_dtl',
# 'age; location',
# '"fname":"Harry"; "age":35; "location":"France"; "dob":"1980/10/16"')]
Or
df = df.select([F.to_json(F.struct(c)).alias(c) for c in df.columns])
df = df.select(F.array_join(F.array([F.translate(c, '{}', '') for c in df.columns]), '; ').alias('a'))
df = df.agg(F.collect_list('a'))
result = [(table_nm, '; '.join(col_list), x) for x in df.first()[0]]