I have a use case, where in if atleast one of the record has the value err_present
in column err
for a particular file, I would like to mark rest of the records for same file as bad_file
value in dataframe.
Input Dataframe
----------- ---------
|err |file_name|
----------- ---------
|err_present|f1 |
| |f1 |
| |f1 |
| |f2 |
| |f2 |
----------- ---------
Above dataframe has err_present
for f1
file_name column. So I want to mark the other rows containing f1
with bad_file
in final dataframe.
Desired output DF:
-------- ---------
|err_present|file_name|
-------- --------- --
|err_present| f1|
|bad_file | f1|
|bad_file | f1|
| null | f2|
| null | f2|
-------- ---------
Example Dataframe
df = spark.createDataFrame([('err_present', 'f1'), ('', 'f1'), ('', 'f1'),
('', 'f2'), ('', 'f2')]
, ['err', 'file_name'])
CodePudding user response:
#Isolate rows with err_present, rename err column and lit bad_file
s = df.where(col('err')=='err_present').withColumn('newerr',lit('bad_file')).drop('err')
(df.join(broadcast(s),how='left', on='file_name')#broadcast join new df from above to avaoid shuffle
.withColumn('err', when(col('err')=='err_present',col('err')).otherwise(col('newerr')))#conditionally update err using new_err
.drop('newerr')#drop unwated column
).show()
--------- -----------
|file_name| err|
--------- -----------
| f1|err_present|
| f1| bad_file|
| f1| bad_file|
| f2| null|
| f2| null|
--------- -----------
CodePudding user response:
You can partition
by the file_name
and create a new column called err_present_in_group
that is True
if the string "err_present"
is in that particular partition:
df.select("*",
F.max(
(F.col('err') == 'err_present')
).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).show()
----------- --------- --------------------
| err|file_name|err_present_in_group|
----------- --------- --------------------
|err_present| f1| true|
| | f1| true|
| | f1| true|
| | f2| false|
| | f2| false|
----------- --------- --------------------
Then our final err_present
column can be determined by applying conditions based on the value in the err
and err_present
columns.
df.select("*",
F.max(
(F.col('err') == 'err_present')
).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).withColumn(
'err_present',
F.when(
(F.col('err_present_in_group')) & (F.col('err') == 'err_present'),
F.lit('err_present')
).when(
(F.col('err_present_in_group')) & (F.col('err') == ''),
F.lit('bad_file')
).otherwise(None)
).select(
'err_present','file_name'
)
----------- --------- --------------------
| err|file_name|err_present_in_group|
----------- --------- --------------------
|err_present| f1| true|
| | f1| true|
| | f1| true|
| | f2| false|
| | f2| false|
----------- --------- --------------------