Home > Back-end >  Why does Spark re-sort the data when the join of the two tables are bucketed and sorted the same way
Why does Spark re-sort the data when the join of the two tables are bucketed and sorted the same way

Time:08-15

I'm doing a simple join of two tables.

SELECT
    a.user_id
FROM
    table1 a
    FULL OUTER JOIN
    table2 b
        ON a.user_id = b.user_id

Both table1 and table2 are bucketed and sorted by user_id with the same number of buckets with the following code.

df.write
    .mode("overwrite")
    .format("parquet")
    .bucketBy(4, "user_id")
    .sortBy("user_id")
    .option("path", "some/path/to/data/")
    .option("compression", "snappy")
    .saveAsTable("table1")

When I look at the execution plan, I see Spark still does a Sort step after the FileScan, which I don't think it should.

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
 - Project [user_id#4483L]
    - SortMergeJoin [user_id#4483L], [user_id#4485L], FullOuter
      :- Sort [user_id#4483L ASC NULLS FIRST], false, 0
      :   - FileScan parquet default.table1[user_id#4483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4
       - Sort [user_id#4485L ASC NULLS FIRST], false, 0
          - FileScan parquet default.table2[user_id#4485L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://example/path_to_table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 4 out of 4

Previously, if I don't bucket and sort the table, the execution plan also include a step of Exchange, so with bucketing, it now eliminated the Exchange step, which is definitely good, but I hope I could eliminate the Sort step as well.

Thanks.

Answer: Summarized from Pradeep yadav's answer

I need to repartition the data before saveAsTable().

df.write
    .repartition(4, col("user_id"))
    .mode("overwrite")
    .format("parquet")
    .bucketBy(4, "user_id")
    .sortBy("user_id")
    .option("path", "some/path/to/data/")
    .option("compression", "snappy")
    .saveAsTable("table1")

CodePudding user response:

You can read this detailed article : https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53 specially the section what about sort? it should be able to answer your query.

  • Related