I have been looking at different global data sorting options for bounded data in Flink 1.14. I found that many questions about this on Stackoverflow and other sites are quite a few years old, about deprecated APIs or don't answer this question fully. As Flink is rapidly developing, I wanted to ask about the options available in the latest stable Flink (1.14).
Following is how I understand the current scenario (which may be wrong). My questions are also attached. Flink has two APIs - DataStream
and Table
- which may run in batch
or streaming
execution modes. The DataSet
API is deprecated.
Batch Execution
Table API: It has an order-by operator. I have used that to sort bounded data. But it seems like its parallelism cannot go above 1. I tried using the
table.exec.resource.default-parallelism
property, which increases the parallelism for operator like groupby. But it didn't increaseorderby
's parallelism. Question - Is orderby's parallelism limited to 1 if I want to do a global sort on bounded data in batch mode? How does flink handle large data then? Does it spill to disk?DataStream API - I didn't find any explicit API to do sorting. The only way I can think of is to first partition-by-range and sort each partition locally by using a custom reduce function. The reduce function will output sorted partitions. But in order to get a globally sorted result, we again have to pass the result into an operator with parallelism of 1. This again brings up the data size issues mentioned above.
Streaming Execution
Table API: The order-by operator requires time as an attribute if streaming mode is used. I was unable to use this for sorting bounded data.
DataStream API: The options are same as the DataStream API in batch execution mode.
Overall, I am unable to find a truly parallel implementations for sorting bounded datasets in Flink. Am I correct in my findings above?
CodePudding user response:
Given how Flink is organized, for batch I think the best that's possible will be to sort partitions of the data and then merge those sorted partitions. That final step cannot be done in parallel. I don't know if the Table/SQL API does anything like this automatically, but I suspect it might, after having taken a quick look at the source code.
You could ask about this on the flink user mailing list (https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list).
For more insight on how batch workloads are executed by the SQL planner and how to tune them, I recommend https://flink.apache.org/2021/10/26/sort-shuffle-part1.html.