Home > Blockchain >  Compare different rows in dataframe containing the same id
Compare different rows in dataframe containing the same id

Time:09-06

I have a Spark dataframe as follows:

spark.sql("""
    SELECT * from all_trans
    """).show()

 --------------- ------------- ----------- ------------------ ----------------- ----------- ---------- 
| transaction_id|card_event_id|card_pos_id|card_point_country|transaction_label|module_name| post_date|
 --------------- ------------- ----------- ------------------ ----------------- ----------- ---------- 
|0P2117292543428|   2502723025|       null|                CZ|              EDU|        mcc|2022-02-10|
| 0P211729824944|   2502723477|       null|                CZ|              EDU|        mcc|2022-02-10|
|  0P31172950208|   2502723587|       null|                CZ|              EDU|        mcc|2022-02-10|
|0P2117294027213|   2502726454|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117294027213|   2502726454|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117293581427|   2502729360|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117293581427|   2502729360|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117292967336|   2502729724|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117292967336|   2502729724|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117292659416|   2502730642|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117292659416|   2502730642|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117293159304|   2502731764|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117293159304|   2502731764|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117293237687|   2502732381|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117293237687|   2502732381|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117293548610|   2502733071|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117293548610|   2502733071|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117293678239|   2502736684|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
|0P2117293678239|   2502736684|   E3KB2938|                CZ|            FUN04|        mcc|2022-02-10|
|0P2117293840924|   2502737447|   E3KB2938|                CZ|       FUN0402007|      regex|2022-02-10|
 --------------- ------------- ----------- ------------------ ----------------- ----------- ---------- 

One transaction_id can have more than 1 transaction_label. I want to be able to go automatically through all transaction_label in the dataframe for each transaction_id and compare whether they match on some level.

I had in mind logic like:

df.foreach(lambda x:
(transaction_id.transaction_label where module_name=='mcc') == (left(transaction_id.transaction_label, 5) where module_name=='regex')

But I don't know how to compare same transaction_id in Spark. Conversion to Pandas fails due to limited driver memory.

CodePudding user response:

You could do a self join. For this, best practice is to provide alias to dataframes. Then you could create a column for the check:

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('0P2117292543428',               'EDU',         'mcc'),
     ( '0P211729824944',               'EDU',         'mcc'),
     (  '0P31172950208',               'EDU',         'mcc'),
     ('0P2117294027213',        'FUN0402007',       'regex'),
     ('0P2117294027213',             'FUN04',         'mcc'),
     ('0P2117293581427',        'FUN0402007',       'regex'),
     ('0P2117293581427',             'FUN04',         'mcc'),
     ('0P2117292967336',        'FUN0402007',       'regex'),
     ('0P2117292967336',             'FUN04',         'mcc'),
     ('0P2117292659416',        'FUN0402007',       'regex'),
     ('0P2117292659416',             'FUN04',         'mcc'),
     ('0P2117293159304',        'FUN0402007',       'regex'),
     ('0P2117293159304',             'FUN04',         'mcc'),
     ('0P2117293237687',        'FUN0402007',       'regex'),
     ('0P2117293237687',             'FUN04',         'mcc'),
     ('0P2117293548610',        'FUN0402007',       'regex'),
     ('0P2117293548610',             'FUN04',         'mcc'),
     ('0P2117293678239',        'FUN0402007',       'regex'),
     ('0P2117293678239',             'FUN04',         'mcc'),
     ('0P2117293840924',        'FUN0402007',       'regex')],
    ['transaction_id', 'transaction_label', 'module_name'])

Script:

df = (df.filter("module_name = 'mcc'").alias('m')
    .join(df.filter("module_name = 'regex'").alias('r'), 'transaction_id')
    .withColumn('check', F.col('m.transaction_label') == F.substring('r.transaction_label', 1, 5))
)
df.show()
#  --------------- ----------------- ----------- ----------------- ----------- ----- 
# | transaction_id|transaction_label|module_name|transaction_label|module_name|check|
#  --------------- ----------------- ----------- ----------------- ----------- ----- 
# |0P2117292659416|            FUN04|        mcc|       FUN0402007|      regex| true|
# |0P2117292967336|            FUN04|        mcc|       FUN0402007|      regex| true|
# |0P2117293159304|            FUN04|        mcc|       FUN0402007|      regex| true|
# |0P2117293237687|            FUN04|        mcc|       FUN0402007|      regex| true|
# |0P2117293548610|            FUN04|        mcc|       FUN0402007|      regex| true|
# |0P2117293581427|            FUN04|        mcc|       FUN0402007|      regex| true|
# |0P2117293678239|            FUN04|        mcc|       FUN0402007|      regex| true|
# |0P2117294027213|            FUN04|        mcc|       FUN0402007|      regex| true|
#  --------------- ----------------- ----------- ----------------- ----------- ----- 
  • Related