I have a Spark dataframe as follows:
spark.sql("""
SELECT * from all_trans
""").show()
--------------- ------------- ----------- ------------------ ----------------- ----------- ----------
| transaction_id|card_event_id|card_pos_id|card_point_country|transaction_label|module_name| post_date|
--------------- ------------- ----------- ------------------ ----------------- ----------- ----------
|0P2117292543428| 2502723025| null| CZ| EDU| mcc|2022-02-10|
| 0P211729824944| 2502723477| null| CZ| EDU| mcc|2022-02-10|
| 0P31172950208| 2502723587| null| CZ| EDU| mcc|2022-02-10|
|0P2117294027213| 2502726454| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117294027213| 2502726454| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117293581427| 2502729360| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117293581427| 2502729360| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117292967336| 2502729724| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117292967336| 2502729724| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117292659416| 2502730642| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117292659416| 2502730642| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117293159304| 2502731764| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117293159304| 2502731764| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117293237687| 2502732381| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117293237687| 2502732381| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117293548610| 2502733071| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117293548610| 2502733071| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117293678239| 2502736684| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
|0P2117293678239| 2502736684| E3KB2938| CZ| FUN04| mcc|2022-02-10|
|0P2117293840924| 2502737447| E3KB2938| CZ| FUN0402007| regex|2022-02-10|
--------------- ------------- ----------- ------------------ ----------------- ----------- ----------
One transaction_id
can have more than 1 transaction_label
.
I want to be able to go automatically through all transaction_label in the dataframe for each transaction_id and compare whether they match on some level.
I had in mind logic like:
df.foreach(lambda x:
(transaction_id.transaction_label where module_name=='mcc') == (left(transaction_id.transaction_label, 5) where module_name=='regex')
But I don't know how to compare same transaction_id in Spark. Conversion to Pandas fails due to limited driver memory.
CodePudding user response:
You could do a self join. For this, best practice is to provide alias
to dataframes. Then you could create a column for the check:
Input:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('0P2117292543428', 'EDU', 'mcc'),
( '0P211729824944', 'EDU', 'mcc'),
( '0P31172950208', 'EDU', 'mcc'),
('0P2117294027213', 'FUN0402007', 'regex'),
('0P2117294027213', 'FUN04', 'mcc'),
('0P2117293581427', 'FUN0402007', 'regex'),
('0P2117293581427', 'FUN04', 'mcc'),
('0P2117292967336', 'FUN0402007', 'regex'),
('0P2117292967336', 'FUN04', 'mcc'),
('0P2117292659416', 'FUN0402007', 'regex'),
('0P2117292659416', 'FUN04', 'mcc'),
('0P2117293159304', 'FUN0402007', 'regex'),
('0P2117293159304', 'FUN04', 'mcc'),
('0P2117293237687', 'FUN0402007', 'regex'),
('0P2117293237687', 'FUN04', 'mcc'),
('0P2117293548610', 'FUN0402007', 'regex'),
('0P2117293548610', 'FUN04', 'mcc'),
('0P2117293678239', 'FUN0402007', 'regex'),
('0P2117293678239', 'FUN04', 'mcc'),
('0P2117293840924', 'FUN0402007', 'regex')],
['transaction_id', 'transaction_label', 'module_name'])
Script:
df = (df.filter("module_name = 'mcc'").alias('m')
.join(df.filter("module_name = 'regex'").alias('r'), 'transaction_id')
.withColumn('check', F.col('m.transaction_label') == F.substring('r.transaction_label', 1, 5))
)
df.show()
# --------------- ----------------- ----------- ----------------- ----------- -----
# | transaction_id|transaction_label|module_name|transaction_label|module_name|check|
# --------------- ----------------- ----------- ----------------- ----------- -----
# |0P2117292659416| FUN04| mcc| FUN0402007| regex| true|
# |0P2117292967336| FUN04| mcc| FUN0402007| regex| true|
# |0P2117293159304| FUN04| mcc| FUN0402007| regex| true|
# |0P2117293237687| FUN04| mcc| FUN0402007| regex| true|
# |0P2117293548610| FUN04| mcc| FUN0402007| regex| true|
# |0P2117293581427| FUN04| mcc| FUN0402007| regex| true|
# |0P2117293678239| FUN04| mcc| FUN0402007| regex| true|
# |0P2117294027213| FUN04| mcc| FUN0402007| regex| true|
# --------------- ----------------- ----------- ----------------- ----------- -----