I have a service that exposes some global state using an observable. The state must be initialized by making an HTTP call. I want all consumers of the state to receive the same value and to receive the current value when they subscribe to the observable.
This scenario is typically handled using a Subject
, which can be initialized by subscribing it to the observable returned by the HTTP request.
However, I don't want the HTTP request to be executed until the first time something subscribes to the global state observable. If you use the subscribe
method of the Subject
, the HTTP request is called then.
I have the following code in the service. This works, but it seems like there should be a simpler way to do this.
private initialized = false;
private readonly subject = new ReplaySubject<string>(1);
constructor(private readonly httpClient: HttpClient) {}
get data$(): Observable<string> {
return new Observable<string>(subscriber => {
if (!this.initialized) {
this.refresh();
}
let subscription = this.subject.subscribe(value => subscriber.next(value));
return () => subscription.unsubscribe();
});
}
refresh(): void {
this.httpClient.get<string>('https://...')
.pipe(
tap(() => this.initialized = true)
)
.subscribe(value => this.subject.next(value));
}
Sandbox: https://codesandbox.io/s/reverent-almeida-d2zbjx?file=/src/app/test.service.ts
CodePudding user response:
The following should accomplish what you want. The data$ field should be the merge of two observables. The first will be the initial http call and the other will be listening to emission from refreshSubject to call the service again.
The initial call won't execute until the first subscription is made and any call to refreshSubject will be ignored as well.
Finally, using shareReplay will cause all subsequent subscribers to get the last result. The refCount parameter is used so that if subscriber count becomes zero, the subscription is completely unsubscribed, and the next subscriber will initiate the first call again. If this behavior is not desirable, you can just use shareReplay(1)
which will always have the last result and continue to listen to refreshSubject.
readonly refreshSubject = new subject<void>();
readonly data$ = merge(
this.httpClient.get<string>('https://...'),
this.refreshSubject.pipe(
switchMap(() => this.httpClient.get<string>('https://...'))
)
).pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
CodePudding user response:
I'm not sure what the subscribe and unsubscribe in your data$()
method is doing. Does this simpler version (returning subject.asObservable()
) also meet your requirements?
get data$(): Observable<string> {
if (!this.initialized) {
this.refresh();
}
return this.subject.asObservable();
}
CodePudding user response:
What I tend to do in similar situations is to create a service that works like this.
- It exposes methods to fire the execution of commands, e.g. execute the http call
- It exposes Observables to which clients can subscribe when they want to react to changes
- Internally uses Subjects to trigger the notifications in the public Observables when the execution of the commands returns something (or errors)
So, in code, it would look something like this
export class MyService {
private _subject = new ReplySubject<any>(1);
public data$ = this._subject.asObservable();
constructor(private readonly httpClient: HttpClient) {}
public executeRemoteService(inputParams: any) {
// the request is executed as soon as a client calls this method
// the subject notifies, errors and completes mirroring what the
// http client does
this.httpClient.get<string>('https://...')
.pipe(
tap(this.data$)
)
.subscribe()
}
}
Any client interested in data$
, i.e. interested in changes in the global state, has to subscribe to data$
and this can happen in any moment.
When is the moment to fire the execution of the service, any component can call the execution of the executeRemoteService
method which will then triggers the notifications in data$
.