Home > Enterprise >  Convert PySpark dataframe into a list of values
Convert PySpark dataframe into a list of values

Time:11-02

I have a PySpark dataframe as shown below. I need to collapse the dataframe rows into a Python dictionary containing column:value pairs. Finally, convert the dictionary into a Python list of tuples as shown below. I am using Spark 2.4.

DataFrame:

>>> myDF.show()

 ------ --- --------- ---------- 
|fname |age|location |   dob    |
 ------ --- --------- ---------- 
|  John| 45|      USA|1985/01/05|
| David| 33|  England|2003/05/19|
|Travis| 56|    Japan|1976/08/12|
|   Tim| 75|Australia|2005/12/18|
| Harry| 35|   France|1980/10/16|
 ------ --- --------- ---------- 

>>> table_nm = 'emp_dtl'
>>> col_list = ['age', 'location'] # from the DF

Expected output:

[('emp_dtl', 'age; location', 'fname:"John"; age:45; location:"USA"; dob:"1985/01/05"'),
 ('emp_dtl', 'age; location', 'fname:"David"; age:33; location:"UK"; dob:"2003/05/19"'),
 ('emp_dtl', 'age; location', 'fname:"Travis"; age:56; location:"Japan"; dob:"1976/08/12"'),
 ('emp_dtl', 'age; location', 'fname:"Tim"; age:75; location:"Australia"; dob:"2005/12/18"'),
 ('emp_dtl', 'age; location', 'fname:"Harry"; age:35; location:"France"; dob:"1980/10/16"')]    

CodePudding user response:

Send it to JSON

df.withColumn('table_nm',lit('emp_dtl')).toJSON().collect()

CodePudding user response:

For some reason I was unable to use collect() in Spark 2.4. So here are two options, close to what you wanted.

Inputs:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('John', 45, 'USA', '1985/01/05'),
     ('David', 33, 'England', '2003/05/19'),
     ('Travis', 56, 'Japan', '1976/08/12'),
     ('Tim', 75, 'Australia', '2005/12/18'),
     ('Harry', 35, 'France', '1980/10/16')],
    ['fname', 'age', 'location', 'dob'])
table_nm = 'emp_dtl'
col_list = ['age', 'location']

Script:

df = df.select([F.to_json(F.struct(c)).alias(c) for c in df.columns])
df = df.select(F.array_join(F.array([F.translate(c, '{}', '') for c in df.columns]), '; ').alias('a'))
result = [(table_nm, '; '.join(col_list), r.a) for r in df.collect()]

# [('emp_dtl',
#   'age; location',
#   '"fname":"John"; "age":45; "location":"USA"; "dob":"1985/01/05"'),
#  ('emp_dtl',
#   'age; location',
#   '"fname":"David"; "age":33; "location":"England"; "dob":"2003/05/19"'),
#  ('emp_dtl',
#   'age; location',
#   '"fname":"Travis"; "age":56; "location":"Japan"; "dob":"1976/08/12"'),
#  ('emp_dtl',
#   'age; location',
#   '"fname":"Tim"; "age":75; "location":"Australia"; "dob":"2005/12/18"'),
#  ('emp_dtl',
#   'age; location',
#   '"fname":"Harry"; "age":35; "location":"France"; "dob":"1980/10/16"')]

Or

df = df.select([F.to_json(F.struct(c)).alias(c) for c in df.columns])
df = df.select(F.array_join(F.array([F.translate(c, '{}', '') for c in df.columns]), '; ').alias('a'))
df = df.agg(F.collect_list('a'))
result = [(table_nm, '; '.join(col_list), x) for x in df.first()[0]]
  • Related