Home > Mobile >  Implement stream with multicast that doesn't complete the underlying subject
Implement stream with multicast that doesn't complete the underlying subject

Time:04-13

I'm implementing the cache using BehaviorSubject and multicast. The stream returned from the cache should start with HTTP request. I should also be able to force refresh the cache by triggering manually next on subject. The common approach with two subjects outlined by Poul Kruijt is well known and suggested everywhere. My idea is to find a way to achieve the following using only one subject throughout lifecycle of a stream.

It would be easy to achieve with multicast like this

const cache = new BehaviorSubject(null);
const shared = queryThatCompletes.pipe(multicast(cache)) as any;

// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));

// triggers http request
shared.connect();

setTimeout(() => {
  // will only emit COMPLETE from subject
  shared.subscribe((values) => console.log(values));
}, 2000);

// force refresh the cache
cache.next();

but since the HTTP query stream completes, the second subscription doesn't get any value, just COMPLETE notification from subject. This behavior is described in detail here.

The other option is to pass a factory function instead of the subject instance like this:

const cache = ()=> new BehaviorSubject(null);
const shared = queryThatCompletes.pipe(multicast(cache)) as any;

This will re-create the subject, that will subscribe to queryThatCompletes and re-trigger HTTP request. But the downsides is the need to call connect multiple times and redundant queries.

const cache = () => new BehaviorSubject(null);
const shared = queryThatCompletes.pipe(multicast(cache)) as any;

// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));

// triggers http request
shared.connect();

setTimeout(() => {
// sets up subscription, waits for connect
  shared.subscribe((values) => console.log(values));

  // triggers http request
  shared.connect();
}, 2000);

So I simply implemented HTTP stream that doesn't complete by itself and use it like this:

const queryOnceButDontComplete = new Observable((observer) => {
  fetch('https://jsonplaceholder.typicode.com/todos/1')
    .then(response => response.json())
    .then(data => observer.next(data));

  return () => {};
});

const cache = new BehaviorSubject(null);
const shared = queryOnceButDontComplete.pipe(multicast(cache)) as any;

// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));

// triggers http request
shared.connect();

setTimeout(() => {
// sets up subscription, waits for connect
  shared.subscribe((values) => console.log(values));
}, 2000);

This works, but I'm wondering if there's a way to achieve what I want without the use of custom observable. Any ideas?

CodePudding user response:

Best would be to just use a shareReplay(1):

const shared = queryThatCompletes.pipe(shareReplay(1));

// sets up subscription, waits for connect
shared.subscribe((values) => console.log(values));

// triggers http request
shared.connect();

setTimeout(() => {
  shared.subscribe((values) => console.log(values));
}, 2000);

Not entirely sure what the subscribe and connect are doing there, but if you are just returning a HttpClient get call, then you should just return the shared observable, and whomever subscribes first to it, will trigger the http request. No need for connect. Any subsequent subscriptions will wait for the request to finish or receive the last emitted value from the observable.


Based on your comment, let's wrap this in a service (untested):

@Injectable()
SomeDataService {
  readonly refresh$ = new BehaviorSubject(undefined);
  
  readonly get$ = this.httpClient.get(/*url here*/);
  
  readonly shared$ = this.refresh$.pipe(
    switchMap(() => this.get$),
    shareReplay(1)
  );

  constructor(private httpClient: HttpClient) {}

  getData(): Observable<unknown> {
    return this.shared$;
  }

  refreshData(): void {
    this.refresh$.next();
  }
}

Does this make sense? Basically you start with a refresh subject, which get mapped to the actual network call. On the first getData(), the network request gets triggered. Any call on getData() after that will get the cached value. Calling refreshData will refresh the data for any subscription

CodePudding user response:

I have made functional approach that I believe is much more simpler it uses two observables refresh$ and the desired observable, I hope this solution might give you some thoughts

 const makeRestartableCahcedObservable = (ob:Observable<any>)=>{
  const refresh$ = new BehaviorSubject(null)
  return [
    refresh$.pipe(
    switchMap(()=>ob),
    shareReplay(1)
  ),
  ()=>refresh$.next(null)
]
}

const [ob, refreshFun] = makeRestartableCahcedObservable(of('string'))

ob.subscribe(console.log)

setTimeout(()=>{refreshFun()}, 3000)

this basically can refresh your data with much more simpler API

  • Related