Home > Back-end >  Adding new column to spark dataframe by getting data lookup from other dataframe
Adding new column to spark dataframe by getting data lookup from other dataframe

Time:04-22

I am trying to join 2 dataframes using pyspark, where data frame1 has multiple records of data from look up dataframe.

>>> df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
>>> df.show()

 --- -- ------ ------------------- 
|  col1|  col2|  error_cloumn     |
 --- -- ------ ------------------- 
|  1   |     4|  date_from,date_to|
|  1   |     8|       emp_name    |
 --- -- ------ ------------------- 


>>> look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
>>> look_up_df.show()

 --- -- ------ -------------------------- 
| 'column_nm'|'clmn1'       |'comment'     |
 --- -- ------ -------------------------- 
|'date_from' |'DD-MM-YY'    | 'text msg1'|
| 'date_to'  |'DD-MM-YY'    | 'test msg2'|
| 'emp_name' |'VARCHAR(100)'| 'test msg3'|
| 'emp_type' |'VARCHAR(100)'| 'test msg4'|
 --- -- ------ -------------------------- 


Expected output : error_desc as look_up_df[column_nm]   lit('expected')   look_up_df[clmn1]  lit('and comment is')   look_up_df[comment]
output_df:
 --- -- ------ ------------------- ----------------------------------------------------------------------------------------------------------------- -
|  col1|  col2|  error_cloumn     | error_desc                                                                                                      |
 --- -- ------ ------------------- ----------------------------------------------------------------------------------------------------------------- -
|  1   |     4|  date_from,date_to|date_from expected as DD-MM-YY  and comment is text msg1, date_to expected as DD-MM-YY  and comment is text msg2 |
|  1   |     8|       emp_name    |emp_name should be VARCHAR(100) and comment is test msg3                                                         |
 --- -- ------ ------------------- ----------------------------------------------------------------------------------------------------------------- -

I am trying using blow code:

from pyspark.sql import functions as F
df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_cloumn'])
df.show()

look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text msg1'),('date_to', 'DD-MM-YY', 'test msg2'),('emp_name', 'VARCHAR(100)', 'test msg3'),('emp_type', 'VARCHAR(100)', 'test msg4')], ['column_nm', 'clmn1', 'comment'])
look_up_df.show()

output_df = df.join(look_up_df, df["error_cloumn"] == look_up_df["column_nm"]).withColumn("error_desc",F.concat(F.Col('column_nm'),F.lit(' expected as '),F.Col('clmn1').lit(' and comment is '),.Col('comment'),))

This code working for one record, but failing for multiple columns like date_from,date_to in records

CodePudding user response:

I would suggest splitting error_cloumn in df, exploding (both contained in pyspark.sql.functions module) and then joining. You would get multiple rows and, grouping by col1, col2 (assuming those could be the grouping keys), you could aggregate text in the way you presented in the results.

Please let me know if you need more support on that

CodePudding user response:

from pyspark.sql import functions as F
from pyspark.sql.functions import concat, lit, expr, when, explode, split, collect_list, concat_ws

df = spark.createDataFrame([(1, 4, 'date_from,date_to'),(1, 8, 'emp_name')], ['col1', 'col2', 'error_column'])
df = df.withColumn('all_values',explode(split('error_column',',')))
df.show()

look_up_df = spark.createDataFrame([('date_from','DD-MM-YY', ' text_msg1'),('date_to', 'DD-MM-YY', 'test_msg2'),('emp_name', 'VARCHAR(100)', 'test_msg3'),('emp_type', 'VARCHAR(100)', 'test_msg4')], ['column_name', 'clmn1', 'comment'])
look_up_df = (
  look_up_df.withColumn('msg', 
                        when(look_up_df.column_name.like('           
  • Related