Home > Enterprise >  PySpark query performance - join vs when
PySpark query performance - join vs when

Time:07-16

I have a query where I want to change the content of two dataframes (purDf ~432.8 MiB & ticDf ~9.3 GiB) based on some conditions and then join them.

I've tried doing it in two different ways with different performances. Alternative 1 relies on a lot of when satements while Alternative 2 is using joins instead to minimize then number of when statements in the query.

Alternative 1

purDf
.select(col('id').alias('TicketEventId'),
    col('ticketId').alias('TicketId'),
    col('ticketEventType').alias('TicketEventType'),
    when( col('ticketEventType') == 'ticketRefunded', col('amountInclVat')*-1)
    .otherwise(col('amountInclVat'))
    .alias('AmountInclVAT'),
    col('created_datetime').alias('CreatedAtLocal')
)
.join(ticDf
  .select(
          when( substring(col('saleschannel_id'),0,2) ==  'Af', 'Ombud').
          when( substring(col('saleschannel_id'),0,2) ==  'Kc', 'Kundcenter').
          when( substring(col('saleschannel_id'),0,2) ==  'Rb', 'Regionbuss').
          when( substring(col('saleschannel_id'),0,2) ==  'Do', 'Do').
          when( substring(col('saleschannel_id'),0,2) ==  'MK', 'Mitt konto').
          when( col('saleschannel_id') ==  'tvm', 'Biljettautomater').
          when( col('saleschannel_id') ==  'washington', 'appen').
          when( col('saleschannel_id') ==  'EComPublic', 'Köp & Skicka').
          when( col('saleschannel_id') ==  'tapnride', 'Blippa').
          when( col('saleschannel_id') ==  'Senior', 'Seniorresor').
          when( col('saleschannel_id') ==  'Skola', 'Skola').
          when( col('saleschannel_id') ==  'Service', 'Serviceresor').
          when( col('saleschannel_id') ==  '2', 'FrontOffice').
          otherwise('Övrigt').
          alias('SalesChannel'),
          col('ticketId').alias('TicketId')),
  'TicketId', how = 'inner'

Using Alternative 1 completed the command in ~ 40 seconds

Alternative 2

stringCodesSubstr = spark.createDataFrame([{'saleschannel_id_substr' : 'Af', 'tempId1' :'Ombud'},
                                             {'saleschannel_id_substr' : 'Kc', 'tempId1' :'Kundcenter'},
                                             {'saleschannel_id_substr' : 'Rb', 'tempId1' :'Regionbuss'},
                                             {'saleschannel_id_substr' : 'Do', 'tempId1' :'Do'},
                                             {'saleschannel_id_substr' : 'MK', 'tempId1' :'Mitt konto'}]
                                         )

stringCodes = spark.createDataFrame([{'saleschannel_id' : 'tvm', 'tempId2' :'Biljettautomater'},
                                      {'saleschannel_id' : 'washington', 'tempId2' :'appen'},
                                      {'saleschannel_id' : 'EComPublic', 'tempId2' :'Köp & Skicka'},
                                      {'saleschannel_id' : 'tapnride', 'tempId2' :'Blippa'},
                                      {'saleschannel_id' : 'Senior', 'tempId2' :'Seniorresor'},
                                      {'saleschannel_id' : 'Skola', 'tempId2' :'Skola'},
                                      {'saleschannel_id' : 'Service', 'tempId2' :'Serviceresor'},
                                      {'saleschannel_id' : '2', 'tempId2' :'FrontOffice'}]
                                   )

ticketsPreProcess = (ticDf
           .select(substring(col('saleschannel_id'),0,2).alias('saleschannel_id_substr'),
                   col('saleschannel_id'),
                   col('ticketId')
           )
          )

tickets = (ticketsPreProcess.join(stringCodesSubstr, ['saleschannel_id_substr'] , how = 'left')
                                       .join(stringCodes, ['saleschannel_id'], how = 'left')
                                       .select(coalesce( col('tempId1'), col('tempId2'), lit('Övrigt') ).alias('SalesChannel'),
                                              col('ticketId')
                                              )
           )

purchaseTemp = purDf.join(tickets, ['ticketId'], how = 'inner') 

purchaseView = (purchaseTemp.select(
                                      col('id').alias('TicketEventId'),
                                      col('ticketId').alias('TicketId'),
                                      col('ticketEventType').alias('TicketEventType'),
                                      when( col('ticketEventType') == 'ticketRefunded', col('amountInclVat')*-1)
                                        .otherwise(col('amountInclVat'))
                                        .alias('AmountInclVAT'),
                                      col('created_datetime').alias('CreatedAtLocal'),
                                      col('SalesChannel'))
                                      )

Using Alternative 2 completed the command in ~ 22 seconds.

I'm using Databrick Runtime Version 10.3 (includes Apache Spark 3.2.1, Scala 2.12) and a cluster of 4 nodes with 32gb memory and 4 cores each.

My question is, what is the reason for this drastic difference in performance? I can see in the Spark UI that Alternative 2 seem to be able to run more things in parallel, but I don't understand why.

I've tried reading the physical plan but since I'm new to spark I cant make any sense of it. If anyone can help me with that, see the plans below.

Alternative 1

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
 - Project [TicketId#234, TicketEventId#233, TicketEventType#235, AmountInclVAT#236, CreatedAtLocal#237, SalesChannel#243]
    - SortMergeJoin [TicketId#234], [TicketId#244], Inner
      :- Sort [TicketId#234 ASC NULLS FIRST], false, 0
      :   - Exchange hashpartitioning(TicketId#234, 200), ENSURE_REQUIREMENTS, [id=#416]
      :      - Project [id#90 AS TicketEventId#233, ticketId#91 AS TicketId#234, ticketEventType#92 AS TicketEventType#235, CASE WHEN (ticketEventType#92 = ticketRefunded) THEN (amountInclVat#94 * -1.0) ELSE amountInclVat#94 END AS AmountInclVAT#236, created_datetime#95 AS CreatedAtLocal#237]
      :         - Filter ((isnotnull(created_datetime#95) AND (created_datetime#95 >= 2022-04-01 00:00:00)) AND isnotnull(ticketId#91))
      :            - FileScan parquet [id#90,ticketid#91,ticketEventType#92,amountInclVat#94,created_datetime#95] Batched: true, DataFilters: [isnotnull(created_datetime#95), (created_datetime#95 >= 2022-04-01 00:00:00), isnotnull(ticketid..., Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/mnt/datalake/gold/purchases], PartitionFilters: [], PushedFilters: [IsNotNull(created_datetime), GreaterThanOrEqual(created_datetime,2022-04-01 00:00:00.0), IsNotNu..., ReadSchema: struct<id:string,ticketid:string,ticketEventType:string,amountInclVat:double,created_datetime:tim...
       - Sort [TicketId#244 ASC NULLS FIRST], false, 0
          - Exchange hashpartitioning(TicketId#244, 200), ENSURE_REQUIREMENTS, [id=#417]
             - Project [CASE WHEN (ephemeralsubstring(saleschannel_id#170, 0, 2) = Af) THEN Ombud WHEN (ephemeralsubstring(saleschannel_id#170, 0, 2) = Kc) THEN Kundcenter WHEN (ephemeralsubstring(saleschannel_id#170, 0, 2) = Rb) THEN Regionbuss WHEN (ephemeralsubstring(saleschannel_id#170, 0, 2) = Do) THEN Do WHEN (ephemeralsubstring(saleschannel_id#170, 0, 2) = MK) THEN Mitt konto WHEN (saleschannel_id#170 = tvm) THEN Biljettautomater WHEN (saleschannel_id#170 = washington) THEN SKÅ-appen WHEN (saleschannel_id#170 = EComPublic) THEN Köp & Skicka WHEN (saleschannel_id#170 = tapnride) THEN Blippa WHEN (saleschannel_id#170 = Senior) THEN Seniorresor WHEN (saleschannel_id#170 = Skola) THEN Skola WHEN (saleschannel_id#170 = Service) THEN Serviceresor WHEN (saleschannel_id#170 = 2) THEN Singapore FrontOffice ELSE Övrigt END AS SalesChannel#243, ticketId#154 AS TicketId#244]
                - Filter ((isnotnull(created_datetime#159) AND (created_datetime#159 >= 2022-04-01 00:00:00)) AND isnotnull(ticketId#154))
                   - FileScan parquet [ticketId#154,created_datetime#159,saleschannel_id#170] Batched: true, DataFilters: [isnotnull(created_datetime#159), (created_datetime#159 >= 2022-04-01 00:00:00), isnotnull(ticket..., Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/mnt/datalake/gold/tickets], PartitionFilters: [], PushedFilters: [IsNotNull(created_datetime), GreaterThanOrEqual(created_datetime,2022-04-01 00:00:00.0), IsNotNu..., ReadSchema: struct<ticketId:string,created_datetime:timestamp,saleschannel_id:string>

Alternative 2

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
 - Project [id#90 AS TicketEventId#1132, ticketId#91 AS TicketId#1133, ticketEventType#92 AS TicketEventType#1134, CASE WHEN (ticketEventType#92 = ticketRefunded) THEN (amountInclVat#94 * -1.0) ELSE amountInclVat#94 END AS AmountInclVAT#1135, created_datetime#95 AS CreatedAtLocal#1136, SalesChannel#1122]
    - SortMergeJoin [ticketid#91], [ticketId#154], Inner
      :- Sort [ticketid#91 ASC NULLS FIRST], false, 0
      :   - Exchange hashpartitioning(ticketid#91, 200), ENSURE_REQUIREMENTS, [id=#785]
      :      - Filter ((isnotnull(created_datetime#95) AND (created_datetime#95 >= 2022-04-01 00:00:00)) AND isnotnull(ticketid#91))
      :         - FileScan parquet [id#90,ticketid#91,ticketEventType#92,amountInclVat#94,created_datetime#95] Batched: true, DataFilters: [isnotnull(created_datetime#95), (created_datetime#95 >= 2022-04-01 00:00:00), isnotnull(ticketid..., Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/mnt/datalake/gold/purchases], PartitionFilters: [], PushedFilters: [IsNotNull(created_datetime), GreaterThanOrEqual(created_datetime,2022-04-01 00:00:00.0), IsNotNu..., ReadSchema: struct<id:string,ticketid:string,ticketEventType:string,amountInclVat:double,created_datetime:tim...
       - Sort [ticketId#154 ASC NULLS FIRST], false, 0
          - Exchange hashpartitioning(ticketId#154, 200), ENSURE_REQUIREMENTS, [id=#786]
             - Project [coalesce(tempId1#1102, tempId2#1106, Övrigt) AS SalesChannel#1122, ticketId#154]
                - SortMergeJoin [saleschannel_id#170], [saleschannel_id#1105], LeftOuter
                  :- Sort [saleschannel_id#170 ASC NULLS FIRST], false, 0
                  :   - Exchange hashpartitioning(saleschannel_id#170, 200), ENSURE_REQUIREMENTS, [id=#777]
                  :      - Project [saleschannel_id#170, ticketId#154, tempId1#1102]
                  :         - SortMergeJoin [saleschannel_id_substr#1109], [saleschannel_id_substr#1101], LeftOuter
                  :           :- Sort [saleschannel_id_substr#1109 ASC NULLS FIRST], false, 0
                  :           :   - Exchange hashpartitioning(saleschannel_id_substr#1109, 200), ENSURE_REQUIREMENTS, [id=#769]
                  :           :      - Project [substring(saleschannel_id#170, 0, 2) AS saleschannel_id_substr#1109, saleschannel_id#170, ticketId#154]
                  :           :         - Filter ((isnotnull(created_datetime#159) AND (created_datetime#159 >= 2022-04-01 00:00:00)) AND isnotnull(ticketId#154))
                  :           :            - FileScan parquet [ticketId#154,created_datetime#159,saleschannel_id#170] Batched: true, DataFilters: [isnotnull(created_datetime#159), (created_datetime#159 >= 2022-04-01 00:00:00), isnotnull(ticket..., Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/mnt/datalake/gold/tickets], PartitionFilters: [], PushedFilters: [IsNotNull(created_datetime), GreaterThanOrEqual(created_datetime,2022-04-01 00:00:00.0), IsNotNu..., ReadSchema: struct<ticketId:string,created_datetime:timestamp,saleschannel_id:string>
                  :            - Sort [saleschannel_id_substr#1101 ASC NULLS FIRST], false, 0
                  :               - Exchange hashpartitioning(saleschannel_id_substr#1101, 200), ENSURE_REQUIREMENTS, [id=#770]
                  :                  - Filter isnotnull(saleschannel_id_substr#1101)
                  :                     - Scan ExistingRDD[saleschannel_id_substr#1101,tempId1#1102]
                   - Sort [saleschannel_id#1105 ASC NULLS FIRST], false, 0
                      - Exchange hashpartitioning(saleschannel_id#1105, 200), ENSURE_REQUIREMENTS, [id=#778]
                         - Filter isnotnull(saleschannel_id#1105)
                            - Scan ExistingRDD[saleschannel_id#1105,tempId2#1106]

CodePudding user response:

After some further testing I found the reason why Alternative 1 was slower than Alternative 2.

Before I ran the code shown above I defined the dataframes purDf and ticDf without actually using them. Due to sparks Lazy Evaluation, the dataframes were not actually loaded until I needed them for the first time. Since I always ran Atlernative 1 before Alternative 2, Alternative 1 seemed slower since it also needed to load the data for the dataframes.

After re-running the tests where I made sure that the dataframes had been loaded and cached before the test, I found that it was actually Alternative 1 that was the best performing one. That makes sense to me since the when statements can be computed on several different partitions independently but the joins requires some amount of shuffling and sorting that needs to be performed before the join can be made.

I managed to improve the performance of Alternative 2 slightly be using broadcast join when creating the dataframe called tickets. But Alternative 1 was still the best performing one.

  • Related