Home > Software engineering >  RxJS Subscription only works one time
RxJS Subscription only works one time

Time:04-21

In my Angular app, I'm manually validating some form data from JSON.

As such, I'm subscribing to a Subject that receives a change event:

private _contentChanged = new Subject<IEditorContentChangedEvent>();
contentChanged$: Observable<IEditorContentChangedEvent>;

constructor() {
  this.contentChanged$ = this._contentChanged.asObservable();
}

onContentChanged(event: IEditorContentChangedEvent): void {
  this._contentChanged.next(event);
}

ngOnInit(): void {
  this.contentChanged$
      .pipe(
        untilDestroyed(this),
        debounceTime(1000),
        filter(event => Boolean(event)),
        map(() => {
          this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
          console.log(this.resource);
          this.parentForm.setValue(this.resource);

          return of(event);
        }),
        catchError((error: Error) => {
          ...

          return EMPTY;
        }),
      )
      .subscribe(() => {
        ...
      });
}

I see the log statement in map() the first time contentChanged$ receives a value, but not the second time if the code in map throws an error.

I'm assuming this is because catchError() returns EMPTY. I've been following guides such as this article on Medium, but no luck so far.

I have tried returning: of(error) in the catchError() handler, but that doesn't work either.

If I just subscribe to contentChanged$ without any operators, I see each time _contentChanged has received a new value, so my source of data is working as expected.

If I make a change that does not cause catchError to be invoked, I see the log statement in map, so it has to be catchError that's closing the stream.

How can I keep the Observable/Subscription alive to process new contentChanged$ values?

CodePudding user response:

  map(() => {
    this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
    console.log(this.resource);
    this.parentForm.setValue(this.resource);

    return of(event);
  }),

Modify above snippet with below one

  map(() => {
    try {
      this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
    } catch(err){
      console.error(err);
    }
    console.log(this.resource);
    this.parentForm.setValue(this.resource);

    return event;
  }),

CodePudding user response:

The Observable Contract

Here's an overview: link

The relevant part:

Upon issuing an complete or error notification, an observable may not thereafter issue any further notifications.

This means that once an observable completes or errors, it's done. They are terminal emissions/notifications.


One Solution:

Don't let your observable emit an error. If you know that some synchronous code in your map is throwing an error, You can catch it and deal with it there and then it'll never propagate up into your observable:

try {
  this.resource = JSON.parse(/* ... */);
} catch(err){
  console.error(err);
}

Another Solution

Once the source observable errors, just re-subscribe to your source observable. Whether this works depends on what side effects are created when you subscribe to your source in the first place.

this.contentChanged$.pipe(
  untilDestroyed(this),
  debounceTime(1000),
  filter(event => Boolean(event)),
  map(() => {
    ...
  }),
  tap({error: (error: Error) => console.error(error) }),
  // By default, if contentChanged$ keeps erroring, this will
  // keep retrying for forever.
  retry()
).subscribe(() => {
  ...
});

You can also conditionally retry by returning an observable from catchError

this.contentChanged$.pipe(
  untilDestroyed(this),
  debounceTime(1000),
  filter(event => Boolean(event)),
  map(() => {
    ...
  }),
  catchError((error: Error, source$) => {
    console.error(error)
    if (/* retry? */){
      return source$;
    } else if (/* Do nothing? */) {
      return EMPTY;
    } else if (/* Emit some string values? */) {
      return of("Hello", "World");
    } 

    // Otherwise rethrow the error!
    return throwError(() => error);
  })
).subscribe(() => {
  ...
});
  • Related