Hi I am creating a generic function or class to add n numbers of datasets but I am unable to find the proper logic to do that, I put all codes below and highlight the section in which I want some help. if you find any problem in understanding my code then please ping me.
import pyspark
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
data_fact = [["1", "sravan", "company 1","100"],
["2", "ojaswi", "company 1","200"],
["3", "rohith", "company 2","300"],
["4", "sridevi", "company 1","400"],
["5", "bobby", "company 1","500"]]
# specify column names
columns = ['ID', 'NAME', 'Company','Amount']
# creating a dataframe from the lists of data
df_fact = spark.createDataFrame(data_fact, columns)
Department_table = [["1", "45000", "IT"],
["2", "145000", "Manager"],
["6", "45000", "HR"],
["5", "34000", "Sales"]]
# specify column names
columns1 = ['ID', 'salary', 'department']
df_Department = spark.createDataFrame(Department_table, columns1)
Leave_Table = [["1", "Sick Leave"],
["2", "Casual leave"],
["3", "Casual leave"],
["4", "Earned Leave"],
["4", "Sick Leave"] ]
# specify column names
columns2 = ['ID', 'Leave_type']
df_Leave = spark.createDataFrame(Leave_Table, columns2)
Phone_Table = [["1", "Apple"],
["2", "Samsung"],
["3", "MI"],
["4", "Vivo"],
["4", "Apple"] ]
# specify column names
columns3 = ['ID', 'Phone_type']
df_Phone = spark.createDataFrame(Phone_Table, columns3)
Df_join = df_fact.join(df_Department,df_fact.ID ==df_Department.ID,"inner")\
.join(df_Phone,df_fact.ID ==df_Phone.ID,"inner")\
.join(df_Leave,df_fact.ID ==df_Leave.ID,"inner")\
.select(df_fact.Amount,df_Department.ID,df_Department.salary,df_Department.department,df_Phone.Phone_type,df_Leave.Leave_type)
display(Df_join)
basically, I want to generalise this stuff for n numbers of datasets
Df_join = df_fact.join(df_Department,df_fact.ID ==df_Department.ID,"inner")\
.join(df_Phone,df_fact.ID ==df_Phone.ID,"inner")\
.join(df_Leave,df_fact.ID ==df_Leave.ID,"inner")\
.select(df_fact.Amount,df_Department.ID,df_Department.salary,df_Department.department,df_Phone.Phone_type,df_Leave.Leave_type) ```
CodePudding user response:
Since you're using inner
join in all dataframe, if you want to prevent the bulky code, you can use the .reduce()
in functools to do the joining and select the column that you want:
df = reduce(lambda x, y: x.join(y, on='id', how='inner'), [df_fact, df_Department, df_Leave, df_Phone])
df.show(10, False)
--- ------ --------- ------ ------ ---------- ------------ ----------
|ID |NAME |Company |Amount|salary|department|Leave_type |Phone_type|
--- ------ --------- ------ ------ ---------- ------------ ----------
|1 |sravan|company 1|100 |45000 |IT |Sick Leave |Apple |
|2 |ojaswi|company 1|200 |145000|Manager |Casual leave|Samsung |
--- ------ --------- ------ ------ ---------- ------------ ----------
https://docs.python.org/3/library/functools.html#functools.reduce
Edit 1: If you need to indicate different key in different joining, given that you have already renamed the columns:
df = reduce(lambda x, y: x.join(y, on=list(set(x.columns)&set(y.columns)), how='inner'), [df_fact, df_Department, df_Leave, df_Phone])
df.show(10, False)
--- ------ --------- ------ ------ ---------- ------------ ----------
|ID |NAME |Company |Amount|salary|department|Leave_type |Phone_type|
--- ------ --------- ------ ------ ---------- ------------ ----------
|1 |sravan|company 1|100 |45000 |IT |Sick Leave |Apple |
|2 |ojaswi|company 1|200 |145000|Manager |Casual leave|Samsung |
--- ------ --------- ------ ------ ---------- ------------ ----------