Home > Enterprise >  Global sort in Flink 1.14 with Table and Datastream APIs
Global sort in Flink 1.14 with Table and Datastream APIs

Time:04-29

I have been looking at different global data sorting options for bounded data in Flink 1.14. I found that many questions about this on Stackoverflow and other sites are quite a few years old, about deprecated APIs or don't answer this question fully. As Flink is rapidly developing, I wanted to ask about the options available in the latest stable Flink (1.14).

Following is how I understand the current scenario (which may be wrong). My questions are also attached. Flink has two APIs - DataStream and Table - which may run in batch or streaming execution modes. The DataSet API is deprecated.

Batch Execution

  • Table API: It has an order-by operator. I have used that to sort bounded data. But it seems like its parallelism cannot go above 1. I tried using the table.exec.resource.default-parallelism property, which increases the parallelism for operator like groupby. But it didn't increase orderby's parallelism. Question - Is orderby's parallelism limited to 1 if I want to do a global sort on bounded data in batch mode? How does flink handle large data then? Does it spill to disk?

  • DataStream API - I didn't find any explicit API to do sorting. The only way I can think of is to first partition-by-range and sort each partition locally by using a custom reduce function. The reduce function will output sorted partitions. But in order to get a globally sorted result, we again have to pass the result into an operator with parallelism of 1. This again brings up the data size issues mentioned above.

Streaming Execution

  • Table API: The order-by operator requires time as an attribute if streaming mode is used. I was unable to use this for sorting bounded data.

  • DataStream API: The options are same as the DataStream API in batch execution mode.

Overall, I am unable to find a truly parallel implementations for sorting bounded datasets in Flink. Am I correct in my findings above?

CodePudding user response:

Given how Flink is organized, for batch I think the best that's possible will be to sort partitions of the data and then merge those sorted partitions. That final step cannot be done in parallel. I don't know if the Table/SQL API does anything like this automatically, but I suspect it might, after having taken a quick look at the source code.

You could ask about this on the flink user mailing list (https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list).

For more insight on how batch workloads are executed by the SQL planner and how to tune them, I recommend https://flink.apache.org/2021/10/26/sort-shuffle-part1.html.

  • Related