i have the following infinite stream :
Observable<MyEvent> eventStream;
Many events are posted on this stream and it stop when the app stop.
I have 2 events that is need to listen:
MyEvent.START & MyEvent.STOP
When the START event is emit, i run a function that can emit STOP. If during a certain time i dont get the STOP event, i want to rerun this function. If after the third run i still don't have the STOP event, it stop retrying.
So i need something like this :
eventStream.filter(e -> e == START || e == STOP)
.repeat(3)
.takeUntil(/* while i dont get STOP*/)
.delay(2, TimeUnit.SECONDS)
.subscribe(this::onStart, this::onError)
/*
* By the way the first time i dont want a delay before running the function
*/
But retry only work when there is an error signal and repeat need a completion signal.
I know its possible to do it but i dont know how to implement this (in an rx way).
ex:
CodePudding user response:
If eventStream
is hot, you can use it for takeUntil
too:
eventStream.filter(e -> e == START)
.flatMap(e -> Observable.just(e)
.observeOn(Schedulers.computation())
.doOnNext(e -> { onStart(e); })
.delay(2, TimeUnit.SECONDS)
.repeat(3)
.takeUntil(eventStream.filter(e -> e == STOP))
)
.subscribe(/* ... */);