I currently have JSON file from which I'm able to dump it's data into a temporary view via. following Python (PySpark) logic:
departMentData = spark \
.read \
.option("multiLine", True) \
.option("mode", "PERMISSIVE") \
.json("C:\\Test\data.json") \
.createOrReplaceTempView("vw_TestView")
This temporary view has the data of departments and list of employees within that department in an array. One employee can be a part of multiple departments.
Following is this view's datatypes:
- DeptID: string
- DeptName: string
- EmployeeIDs: array<string.>
and the table data for vw_TestView is as follows
DeptID | DeptName | EmployeeIDs |
---|---|---|
D01 | dev | ["U1234", "U6789"] |
D02 | qa | ["U1234", "U2345"] |
and another table Employees has details of all these employees as follows:
EmpID | EmpName |
---|---|
U1234 | jon |
U6789 | smith |
U2345 | natasha |
I need the final output for a new table to be as follows:
DeptID | DeptName | EmployeeIDs | EmployeeNames |
---|---|---|---|
D01 | dev | ["U1234", "U6789"] | ["jon", "smith"] |
D02 | qa | ["U1234", "U2345"] | ["jon", "natasha"] |
How can such joins be performed in either Databricks SQL or via PySPark?
CodePudding user response:
You may try the following which uses explode
to split the list of Employee Ids into different rows before joining them and using collect_list
to aggregate the entries into a list.
Using spark sql:
NB. Ensure Employees
is available as a table/view eg EmployeeData.createOrReplaceTempView("Employees")
WITH dept_employees AS (
SELECT
DeptId,
DeptName,
explode(EmployeeIDs)
FROM
vw_TestView
)
SELECT
d.DeptId,
d.DeptName,
collect_list(e.EmpID) as EmployeeIDs,
collect_list(e.EmpName) as EmployeeNames
FROM
dept_employees d
INNER JOIN
Employees e ON d.col=e.EmpID
GROUP BY
d.Deptid,
d.DeptName
or using pyspark api:
from pyspark.sql import functions as F
output_df = (
departMentData.select(
F.col("DeptId"),
F.col("DeptName"),
F.explode("EmployeeIDs")
)
.alias("d")
.join(
EmployeeData.alias("e"),
F.col("d.col")==F.col("e.EmpID"),
"inner"
)
.groupBy("d.DeptId","d.DeptName")
.agg(
F.collect_list("e.EmpID").alias("EmployeeIDs"),
F.collect_list("e.EmpName").alias("EmployeeNames")
)
)
Let me know if this works for you.