I'm implementing the cache using BehaviorSubject
and multicast
. The stream returned from the cache should start with HTTP request. I should also be able to force refresh the cache by triggering manually next
on subject. The common approach with two subjects outlined by Poul Kruijt is well known and suggested everywhere. My idea is to find a way to achieve the following using only one subject throughout lifecycle of a stream.
It would be easy to achieve with multicast like this
const cache = new BehaviorSubject(null);
const shared = queryThatCompletes.pipe(multicast(cache)) as any;
// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));
// triggers http request
shared.connect();
setTimeout(() => {
// will only emit COMPLETE from subject
shared.subscribe((values) => console.log(values));
}, 2000);
// force refresh the cache
cache.next();
but since the HTTP query stream completes, the second subscription doesn't get any value, just COMPLETE
notification from subject. This behavior is described in detail here.
The other option is to pass a factory function instead of the subject instance like this:
const cache = ()=> new BehaviorSubject(null);
const shared = queryThatCompletes.pipe(multicast(cache)) as any;
This will re-create the subject, that will subscribe to queryThatCompletes
and re-trigger HTTP request. But the downsides is the need to call connect
multiple times and redundant queries.
const cache = () => new BehaviorSubject(null);
const shared = queryThatCompletes.pipe(multicast(cache)) as any;
// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));
// triggers http request
shared.connect();
setTimeout(() => {
// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));
// triggers http request
shared.connect();
}, 2000);
So I simply implemented HTTP stream that doesn't complete by itself and use it like this:
const queryOnceButDontComplete = new Observable((observer) => {
fetch('https://jsonplaceholder.typicode.com/todos/1')
.then(response => response.json())
.then(data => observer.next(data));
return () => {};
});
const cache = new BehaviorSubject(null);
const shared = queryOnceButDontComplete.pipe(multicast(cache)) as any;
// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));
// triggers http request
shared.connect();
setTimeout(() => {
// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));
}, 2000);
This works, but I'm wondering if there's a way to achieve what I want without the use of custom observable. Any ideas?
CodePudding user response:
Best would be to just use a shareReplay(1)
:
const shared = queryThatCompletes.pipe(shareReplay(1));
// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));
// triggers http request
shared.connect();
setTimeout(() => {
shared.subscribe((values) => console.log(values));
}, 2000);
Not entirely sure what the subscribe
and connect
are doing there, but if you are just returning a HttpClient
get
call, then you should just return the shared
observable, and whomever subscribes first to it, will trigger the http request. No need for connect
. Any subsequent subscriptions will wait for the request to finish or receive the last emitted value from the observable.
Based on your comment, let's wrap this in a service (untested):
@Injectable()
SomeDataService {
readonly refresh$ = new BehaviorSubject(undefined);
readonly get$ = this.httpClient.get(/*url here*/);
readonly shared$ = this.refresh$.pipe(
switchMap(() => this.get$),
shareReplay(1)
);
constructor(private httpClient: HttpClient) {}
getData(): Observable<unknown> {
return this.shared$;
}
refreshData(): void {
this.refresh$.next();
}
}
Does this make sense? Basically you start with a refresh subject, which get mapped to the actual network call. On the first getData()
, the network request gets triggered. Any call on getData()
after that will get the cached value. Calling refreshData
will refresh the data for any subscription
CodePudding user response:
I have made functional approach that I believe is much more simpler it uses two observables refresh$ and the desired observable, I hope this solution might give you some thoughts
const makeRestartableCahcedObservable = (ob:Observable<any>)=>{
const refresh$ = new BehaviorSubject(null)
return [
refresh$.pipe(
switchMap(()=>ob),
shareReplay(1)
),
()=>refresh$.next(null)
]
}
const [ob, refreshFun] = makeRestartableCahcedObservable(of('string'))
ob.subscribe(console.log)
setTimeout(()=>{refreshFun()}, 3000)
this basically can refresh your data with much more simpler API