Home > Net >  Mark records with error in a column for the file with a constant value for final dataframe
Mark records with error in a column for the file with a constant value for final dataframe

Time:01-22

I have a use case, where in if atleast one of the record has the value err_present in column err for a particular file, I would like to mark rest of the records for same file as bad_file value in dataframe.

Input Dataframe

 ----------- --------- 
|err        |file_name|
 ----------- --------- 
|err_present|f1       |
|           |f1       |
|           |f1       |
|           |f2       |
|           |f2       |
 ----------- --------- 

Above dataframe has err_present for f1 file_name column. So I want to mark the other rows containing f1 with bad_file in final dataframe.

Desired output DF:
 -------- --------- 
|err_present|file_name|
 -------- --------- --
|err_present|       f1|
|bad_file   |       f1|
|bad_file   |       f1|
|    null   |       f2|
|    null   |       f2|
 -------- --------- 

Example Dataframe


df = spark.createDataFrame([('err_present', 'f1'), ('', 'f1'), ('', 'f1'),
                        ('', 'f2'), ('', 'f2')]
                       , ['err', 'file_name'])

CodePudding user response:

#Isolate rows with err_present, rename err column and lit bad_file
s = df.where(col('err')=='err_present').withColumn('newerr',lit('bad_file')).drop('err')

(df.join(broadcast(s),how='left', on='file_name')#broadcast join new df from above to avaoid shuffle
 .withColumn('err', when(col('err')=='err_present',col('err')).otherwise(col('newerr')))#conditionally update err using new_err
 .drop('newerr')#drop unwated column
).show()



 --------- ----------- 
|file_name|        err|
 --------- ----------- 
|       f1|err_present|
|       f1|   bad_file|
|       f1|   bad_file|
|       f2|       null|
|       f2|       null|
 --------- ----------- 

CodePudding user response:

You can partition by the file_name and create a new column called err_present_in_group that is True if the string "err_present" is in that particular partition:

df.select("*", 
  F.max(
    (F.col('err') == 'err_present')
  ).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).show()

 ----------- --------- -------------------- 
|        err|file_name|err_present_in_group|
 ----------- --------- -------------------- 
|err_present|       f1|                true|
|           |       f1|                true|
|           |       f1|                true|
|           |       f2|               false|
|           |       f2|               false|
 ----------- --------- -------------------- 

Then our final err_present column can be determined by applying conditions based on the value in the err and err_present columns.

df.select("*", 
  F.max(
    (F.col('err') == 'err_present')
  ).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).withColumn(
    'err_present',
    F.when(
        (F.col('err_present_in_group')) & (F.col('err') == 'err_present'), 
        F.lit('err_present')
    ).when(
        (F.col('err_present_in_group')) & (F.col('err') == ''), 
        F.lit('bad_file')
    ).otherwise(None)
).select(
    'err_present','file_name'
)

 ----------- --------- -------------------- 
|        err|file_name|err_present_in_group|
 ----------- --------- -------------------- 
|err_present|       f1|                true|
|           |       f1|                true|
|           |       f1|                true|
|           |       f2|               false|
|           |       f2|               false|
 ----------- --------- -------------------- 
  • Related