Home > Back-end >  Wait until previous event finish Rx
Wait until previous event finish Rx

Time:04-29

In my code, I have events that need to be processed sequentially and others in parallel.

I managed to do the parallel part but I can't do the sequential version.

event.ofType(Message.class)
    .groupBy(Message::getGroup)
    .flatMap(g -> {
        if (g.getKey() == Message.GROUP_PARALLEL)
            return g.flatMap(m -> Observable.just(m)
                        .observeOn(Schedulers.computation())
                        .doOnNext(this::send)
                        .delay(m.getRetryInterval(), TimeUnit.SECONDS)
                        .repeat(m.getRetryCount())
                        .takeUntil(event.ofType(Sent.class)
                            .filter(a -> a.getId() == m.getId())
                        )
                        .map(s -> m)
                    );
        else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
            // ?
        }
    })

I would like to implement it like this :

When an event is received
    If the previous event is still being processed, wait before emission
    Process the event (blocking)

So:

    event.subscribeOn(Schedulers.computation())
        .waitUntilPreviousComplete()
        .flatMap(this::processEvent) // we assume it take 1 seconds to complete
        .susbcribe(this::processed, this::onError);

    event.onNext(Event.A)
    event.onNext(Event.B)
    event.onNext(Event.C)

Should result:

    12h00:00 processed call with event A
    12h00:01 processed call with event B
    12h00:02 processed call with event C

So i want something looking like a Queue. I tried weird things with HashMap and Subjects but Im sure there is a clean way to do it. How can I implement that ?

CodePudding user response:

It sounds like you're looking for concatMap. That operator will subscribe to items emitted by upstream sequentially, and won't move on to the next item until the previous on completes. So your example code would look like this:

event.subscribeOn(Schedulers.computation())
    .concatMap(this::processEvent)
    .subscribe(this::processed, this::onError);
  • Related