Home > Software design >  NiFi how to manage out of order when EnforceOrder is not applicable?
NiFi how to manage out of order when EnforceOrder is not applicable?

Time:02-25

I'm receiving real-time data that can arrive Out-Of-Order, some data arrive late compared to others. For event processing later I need to reorder the stream of flowfile and I'm trying to see if that's possible in NiFi.

My understanding of EnforceOrder is that it require an incremental integer as flowfile attribute, and knowing the starting point.

As I'm dealing with real-time flow I don't have a starting point so I don't see how to make that work.

What I have is a timestamp in the data that I can extract, convert as a linux timestamp, and write in the "priority" attribute. So after that I setup a queue with Prioritizer = PriorityAttributePrioritizer. But the flow file don't stay long enough in the queue to catch up the out-of-order issue, as all processor after that are fast (i'm dealing with 1200 flowfile / sec).

a) Is there a way to have a queue acting as a buffer, still keeping the speed, just delaying enough to have a fair number of flowfile in the queue to be prioritized ?

b) Or how else can we fix an out-of-order issue ?

CodePudding user response:

I had designed a waiter process group for this. In my case I had to wait 10 seconds to sort my flows. Below process group basically circles flows in a certain period.

You can also apply your priority on unmatched queue, which should be big enough.

enter image description here

RouteOnAttribute

enter image description here

UpdateAttribute

enter image description here

CodePudding user response:

I've something here that look like working at slow speed but for the moment it breakdown at higher speed.

The first block is just a simulator that generate a flowfile par second. The UpdateAttribute can be the one that generate the priority attribute for sorting.

The rest is to manage burst.

  • Increase a counter for each flowfile
  • wait for the counter to reach a limit
  • if flowfiles go through, the counter is reset

Solution

The 1st Notify increment the counter by 1 for each flowfile.

1st Notify

The Wait will let flowfile go through only if the counter reach 5 The queue before the flowfile use PriorityAttributePrioritizer, all the other queue will be set to FIFO.

Wait

The Last Notify that reset the counter to zero

Last Notify

  • Related