I'm receiving real-time data that can arrive Out-Of-Order, some data arrive late compared to others. For event processing later I need to reorder the stream of flowfile and I'm trying to see if that's possible in NiFi.
My understanding of EnforceOrder is that it require an incremental integer as flowfile attribute, and knowing the starting point.
As I'm dealing with real-time flow I don't have a starting point so I don't see how to make that work.
What I have is a timestamp in the data that I can extract, convert as a linux timestamp, and write in the "priority" attribute. So after that I setup a queue with Prioritizer = PriorityAttributePrioritizer. But the flow file don't stay long enough in the queue to catch up the out-of-order issue, as all processor after that are fast (i'm dealing with 1200 flowfile / sec).
a) Is there a way to have a queue acting as a buffer, still keeping the speed, just delaying enough to have a fair number of flowfile in the queue to be prioritized ?
b) Or how else can we fix an out-of-order issue ?
CodePudding user response:
I had designed a waiter process group
for this. In my case I had to wait 10 seconds to sort my flows. Below process group basically circles flows in a certain period.
You can also apply your priority on unmatched queue
, which should be big enough.
RouteOnAttribute
UpdateAttribute
CodePudding user response:
I've something here that look like working at slow speed but for the moment it breakdown at higher speed.
The first block is just a simulator that generate a flowfile par second. The UpdateAttribute can be the one that generate the priority attribute for sorting.
The rest is to manage burst.
- Increase a counter for each flowfile
- wait for the counter to reach a limit
- if flowfiles go through, the counter is reset
The 1st Notify increment the counter by 1 for each flowfile.
The Wait will let flowfile go through only if the counter reach 5 The queue before the flowfile use PriorityAttributePrioritizer, all the other queue will be set to FIFO.
The Last Notify that reset the counter to zero