I need to build an rxjs operator which I'll call latest
. Here's an example of a Karma test demonstrating usage:
const root = new Subject<string>();
const inner = new Subject<number>();
const obj = { 'foo': inner };
const sut = root.pipe(latest(value => obj[value])); // latest operator applied
root.next('foo');
inner.next(42);
const spy = jasmine.createSpy();
sut.subscribe(spy);
expect(spy).toHaveBeenCalledWith(42);
Here are the actual requirements:
- It is like
BehaviorSubject
: any future subscribers tosut
will immediately benext
ed with the most recently received valid value. Note that it must subscribe to theroot
observable when the operator is called and immediately begin caching values for a future catch-up subscriber, even before anyone has subscribed tosut
yet. - It is like
switchMap
: when theroot
observable emits a new value, that value is converted (via the input factory function) to a new observable (which I'll call the "inner observable") whose values are monitored for the output. - It is like
share
: it needs to multicast and to continue to process new subscriptions even after an inner observable has errored or completed. In fact, when a new subscription occurs after the inner observable errors, this new subscriber needs to get the most recent valid value immediately as though no error had happened. - It needs to emit no values to any subscriber until an inner observable has emitted a value.
- It needs to error and release all current subscribers when the inner observable errors.
- It needs to error/complete and release all current and future subscribers when the
root
observable errors/completes respectively.
The below is my code for this.
function latest<TIn, TOut>(factory: (input: TIn) => Observable<TOut>): OperatorFunction<TIn, TOut> {
return (src: Observable<TIn>) => {
const subscribers: Subscriber<TOut>[] = [];
let hasValue = false;
let latestValue: TOut = undefined;
let outerSubscription: Subscription = null, innerSubscription: Subscription = null;
let rootError: any, rootCompleted = false, rootErrorReceived = false;
outerSubscription = src.subscribe((value: TIn) => {
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = factory(value).subscribe((innerValue: TOut) => {
hasValue = true;
latestValue = innerValue;
subscribers.forEach(s => s.next(latestValue));
}, error => {
innerSubscription = null;
subscribers.splice(0, subscribers.length).forEach(s => s.error(error));
}, () => {
innerSubscription = null;
});
}, error => {
rootErrorReceived = true, rootError = error;
subscribers.splice(0, subscribers.length).forEach(s => s.error(error));
if (innerSubscription) {
innerSubscription.unsubscribe();
innerSubscription = null;
}
}, () => {
rootCompleted = true, latestValue = undefined;
if (innerSubscription) {
innerSubscription.unsubscribe();
innerSubscription = null;
}
subscribers.splice(0, subscribers.length).forEach(s => s.complete());
});
return new Observable<TOut>((subscriber: Subscriber<TOut>) => {
if (rootCompleted) {
subscriber.complete();
return undefined;
}
if (rootErrorReceived) {
subscriber.error(rootError);
return undefined;
}
subscribers.push(subscriber);
if (hasValue) {
subscriber.next(latestValue);
}
return () => {
const i = subscribers.indexOf(subscriber);
if (i !== -1) {
subscribers.splice(i, 1);
}
};
});
};
}
It works correctly according to all my requirements, but there simply must be a more rxjs way of doing this. I've tried several combinations of share
, switchMap
, catchError
, onErrorResumeNext
, Subject
, ConnectableObservable
, and BehaviorSubject
, but I can't figure out the right one, especially with the need to subscribe immediately and handle future subscribers after error.
I'm on rxjs 6.6.2, but I can (probably) update if need be.
CodePudding user response:
You can use a combination of switchMap
and shareReplay
like this
root.pipe(switchMap(value => obj[value]), shareReplay(1))
CodePudding user response:
What you are looking for is probably a combination or skipUntil()
& share()
const heartbeat = timer(0, 1000);
const waiter = of('go').pipe(delay(5000));
const myHotObservable = heartbeat.pipe(skipUntil(waiter), share());
myHotObservable.subscribe(console.log);
setTimeout(() => {
myHotObservable.subscribe((v) => console.log('2', v));
}, 5000);