In my Angular app, I'm manually validating some form data from JSON.
As such, I'm subscribing to a Subject that receives a change event:
private _contentChanged = new Subject<IEditorContentChangedEvent>();
contentChanged$: Observable<IEditorContentChangedEvent>;
constructor() {
this.contentChanged$ = this._contentChanged.asObservable();
}
onContentChanged(event: IEditorContentChangedEvent): void {
this._contentChanged.next(event);
}
ngOnInit(): void {
this.contentChanged$
.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
catchError((error: Error) => {
...
return EMPTY;
}),
)
.subscribe(() => {
...
});
}
I see the log statement in map()
the first time contentChanged$ receives a value, but not the second time if the code in map
throws an error.
I'm assuming this is because catchError()
returns EMPTY
. I've been following guides such as this article on Medium, but no luck so far.
I have tried returning: of(error)
in the catchError()
handler, but that doesn't work either.
If I just subscribe to contentChanged$
without any operators, I see each time _contentChanged
has received a new value, so my source of data is working as expected.
If I make a change that does not cause catchError
to be invoked, I see the log statement in map
, so it has to be catchError
that's closing the stream.
How can I keep the Observable/Subscription alive to process new contentChanged$
values?
CodePudding user response:
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
Modify above snippet with below one
map(() => {
try {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
} catch(err){
console.error(err);
}
console.log(this.resource);
this.parentForm.setValue(this.resource);
return event;
}),
CodePudding user response:
The Observable Contract
Here's an overview: link
The relevant part:
Upon issuing an complete or error notification, an observable may not thereafter issue any further notifications.
This means that once an observable completes or errors, it's done. They are terminal emissions/notifications.
One Solution:
Don't let your observable emit an error. If you know that some synchronous code in your map is throwing an error, You can catch it and deal with it there and then it'll never propagate up into your observable:
try {
this.resource = JSON.parse(/* ... */);
} catch(err){
console.error(err);
}
Another Solution
Once the source observable errors, just re-subscribe to your source observable. Whether this works depends on what side effects are created when you subscribe to your source in the first place.
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
tap({error: (error: Error) => console.error(error) }),
// By default, if contentChanged$ keeps erroring, this will
// keep retrying for forever.
retry()
).subscribe(() => {
...
});
You can also conditionally retry by returning an observable from catchError
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
catchError((error: Error, source$) => {
console.error(error)
if (/* retry? */){
return source$;
} else if (/* Do nothing? */) {
return EMPTY;
} else if (/* Emit some string values? */) {
return of("Hello", "World");
}
// Otherwise rethrow the error!
return throwError(() => error);
})
).subscribe(() => {
...
});