Let's say I have an input Flux
containing many (billions of strings) like this:
- apple
- application
- bible
- book
There are billions of such strings, they won't fit into memory, that's why I want to use the reactive approach.
The stream is sorted. Now what I want is to create a flux of ordered groups of strings by the first 3 characters:
- app: apple, application
- bib: bible
- boo: book
This Flux
ends up on an HTTP response which means that all "app" items must be outputted before "bib" items begin.
Without using Flux
I could use the ordered property and collect the items into a prepared bucket (amount of strings per bucket will fit into memory) as they come - whenever the prefix changes, I will flush the bucket and start collecting new prefix. The big advantage of the stream being ordered is that I know that once I encounter new prefix, the old one will never come again.
But using Flux
I don't know how to do this. The .groupBy()
will return Flux
of Flux
but I don't think this will work when trying to serialize this to the HTTP response output stream.
CodePudding user response:
This is pretty much a textbook use case for windowUntilChanged()
.
In your case, the "key" you want to extract is the first 3 letters of the string, so you can do something like flux.windowUntilChanged(str -> str.substring(0,3))
, which will give you a Flux<Flux<String>>
, where the inner fluxes start and finish on change of the first 3 letters in your string. You may want to add some additional logic to deal with words less than 3 characters long of course, but I'll leave that as an exercise for the reader :-)
(I know you've mentioned it in the question, but just for clarification and the sake of anyone else finding this answer - this will only work if the incoming elements on the stream are already sorted alphabetically.)