Home > database >  Repeat using condition on infinite stream RxJava
Repeat using condition on infinite stream RxJava

Time:04-21

i have the following infinite stream :

Observable<MyEvent> eventStream;

Many events are posted on this stream and it stop when the app stop.

I have 2 events that is need to listen:

MyEvent.START & MyEvent.STOP

When the START event is emit, i run a function that can emit STOP. If during a certain time i dont get the STOP event, i want to rerun this function. If after the third run i still don't have the STOP event, it stop retrying.

So i need something like this :

eventStream.filter(e -> e == START || e == STOP)
   .repeat(3)
   .takeUntil(/* while i dont get STOP*/)
   .delay(2, TimeUnit.SECONDS)
   .subscribe(this::onStart, this::onError)
   
/*
* By the way the first time i dont want a delay before running the function
*/

But retry only work when there is an error signal and repeat need a completion signal.

I know its possible to do it but i dont know how to implement this (in an rx way).

ex:

enter image description here

CodePudding user response:

If eventStream is hot, you can use it for takeUntil too:

eventStream.filter(e -> e == START)
.flatMap(e -> Observable.just(e)
             .observeOn(Schedulers.computation())
             .doOnNext(e -> { onStart(e); })
             .delay(2, TimeUnit.SECONDS)
             .repeat(3)
             .takeUntil(eventStream.filter(e -> e == STOP))
)
.subscribe(/* ... */);
  • Related