I have a dataset as below.
------------------------------------------------------- ------------------------------------------------------------------- --------------------------------------------------------------------------------------- ------------- --------------------------------------------------------- -------------
|emp_id |sik_id |modification_date |file_name |org_path |received_date|
------------------------------------------------------- ------------------------------------------------------------------- --------------------------------------------------------------------------------------- ------------- --------------------------------------------------------- -------------
|[85627230-s387s09, 98722016-s015s05, 40022035-s008s21] |[f13c1320-5c8f3daas5cd, f13c1384-6659-4831, 4831-aaf1-5c8f3da] |[2021-04-19T11:43:32.617953Z, 2021-04-19T11:43:32.858290Z, 2021-04-19T11:43:34.027082Z]|test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|[67dm34-4334, 8723gv6-2022, 6f7m99-2244-ki856] |[66d9-4888-aaf1, aaf1-5c8f3da1d5cd, f13c1884-66d9] |[2020-11-12T23:22:05.433107Z, 2020-11-12T20:16:51.339437Z, 2020-11-11T20:59:03.758126Z]|test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
------------------------------------------------------- ------------------------------------------------------------------- --------------------------------------------------------------------------------------- ------------- --------------------------------------------------------- -------------
Whose schema contains array and string fields as below
>>> df.printSchema()
root
|-- emp_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sik_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- modification_date: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file_name: string (nullable = false)
|-- org_path: string (nullable = false)
|-- received_date: string (nullable = false)
I would like to get a result something like below where every emp_id, sik_id, modification_date gets the right file_name, org_path , received_date
----------------- -------------------------- ----------------------------- ------------- --------------------------------------------------------- -------------
|emp_id |sik_id |modification_date |file_name |org_path |received_date|
----------------- -------------------------- ----------------------------- ------------- --------------------------------------------------------- -------------
|85627230-s387s09 |f13c1320-5c8f3daas5cd |2021-04-19T11:43:32.617953Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|98722016-s015s05 |f13c1384-6659-4831 |2021-04-19T11:43:32.858290Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|40022035-s008s21 |4831-aaf1-5c8f3da |2021-04-19T11:43:34.027082Z |test1.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/ |2022-01-25 |
|67dm34-4334 |66d9-4888-aaf1 |2020-11-12T23:22:05.433107Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
|8723gv6-2022 |aaf1-5c8f3da1d5cd |2020-11-12T20:16:51.339437Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
|6f7m99-2244-ki856|f13c1884-66d9 |2020-11-11T20:59:03.758126Z |test2.json |s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/ |2022-01-25 |
----------------- -------------------------- ----------------------------- ------------- --------------------------------------------------------- -------------
I tried using zip() on these fields but looks like zip doesn't work on array and string fields. As I was seeing a type mismatch exception.
Can someone please help me with the right solution.
Thanks in advance.
CodePudding user response:
Combining the SQL functions arrays_zip
and inline
.
df = df.selectExpr('inline(arrays_zip(emp_id, sik_id, modification_date))', 'file_name', 'org_path', 'received_date')
df.show(truncate=False)
CodePudding user response:
Try with explode:
from pyspark.sql.functions import explode
arrayData = [(['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'], 'test1.json')]
df = spark.createDataFrame(data=arrayData, schema = ['emp_id', 'file_name'])
df2 = df.select(df.file_name,explode(df.emp_id))
df2.printSchema()
df2.show()
CodePudding user response:
You can use array_zip in addition to explode -
Data Preparation
d = {
'emp_id':[
['85627230-s387s09', '98722016-s015s05', '40022035-s008s21'],
['67dm34-4334', '8723gv6-2022', '6f7m99-2244-ki856']
] ,
'sik_id':[
['f13c1320-5c8f3daas5cd', 'f13c1384-6659-4831', '4831-aaf1-5c8f3da'],
['66d9-4888-aaf1', 'aaf1-5c8f3da1d5cd', 'f13c1884-66d9']
],
'modification_date':[
['2021-04-19T11:43:32.617953Z', '2021-04-19T11:43:32.858290Z', '2021-04-19T11:43:34.027082Z'],
['2020-11-12T23:22:05.433107Z', '2020-11-12T20:16:51.339437Z', '2020-11-11T20:59:03.758126Z']
],
'file_name': ['test1.json','test2.json'],
'org_path': ['s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-24/','s3://my-bucket/test_prefix/vualt2/rcvd_dt=2022-01-25/'],
'received_date': ['2022-01-25','2022-01-25']
}
df = pd.DataFrame(d)
sparkDF = sql.createDataFrame(df)
sparkDF.printSchema()
root
|-- emp_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sik_id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- modification_date: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file_name: string (nullable = true)
|-- org_path: string (nullable = true)
|-- received_date: string (nullable = true)
Array Zip & Explode
sparkDF = sparkDF.withColumn("exploded_tmp", F.arrays_zip( F.col('emp_id')
,F.col('sik_id')
,F.col('modification_date')
)
)\
.withColumn("exploded", F.explode("exploded_tmp"))
sparkDF.select(
F.col('exploded.emp_id')
,F.col('exploded.sik_id')
,F.col('exploded.modification_date')
,F.col('file_name')
,F.col('org_path')
,F.col('received_date')
).show()
----------------- -------------------- -------------------- ---------- -------------------- -------------
| emp_id| sik_id| modification_date| file_name| org_path|received_date|
----------------- -------------------- -------------------- ---------- -------------------- -------------
| 85627230-s387s09|f13c1320-5c8f3daa...|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 98722016-s015s05| f13c1384-6659-4831|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 40022035-s008s21| 4831-aaf1-5c8f3da|2021-04-19T11:43:...|test1.json|s3://my-bucket/te...| 2022-01-25|
| 67dm34-4334| 66d9-4888-aaf1|2020-11-12T23:22:...|test2.json|s3://my-bucket/te...| 2022-01-25|
| 8723gv6-2022| aaf1-5c8f3da1d5cd|2020-11-12T20:16:...|test2.json|s3://my-bucket/te...| 2022-01-25|
|6f7m99-2244-ki856| f13c1884-66d9|2020-11-11T20:59:...|test2.json|s3://my-bucket/te...| 2022-01-25|
----------------- -------------------- -------------------- ---------- -------------------- -------------