Why subject.next(2);
not works? I know it may happens because the signal stream got error.
But I add catchError
and I return a EMPTY
in the pipeline and it make it complete.
So seems catchError
not helping me either.
How to make subject.next(2)
works?
My code is calling to doWork
function. inside I invoke work
function. if it's one I simulate an error at the first time I call the function, after that should not have error.
Then I expose the subject and the subject-pipeline to do something else after the pipeline is invoked. I use share
for that (to not duplicate my flow) and it should be happens only once.
import { EMPTY, of, Subject, throwError } from 'rxjs';
import { exhaustMap, share, take, tap } from 'rxjs/operators';
console.clear();
const doWork = () => {
const subject = new Subject();
const work = (n) => {
console.log('in work fn', n);
if (n === 1) {
throw new Error('bla');
}
return of([]);
};
const pipe = subject.pipe(
exhaustMap((n) => work(n)),
share()
);
pipe.subscribe({
next: () => console.log('in subjectPipe next'),
error: () => console.log('in subjectPipe error'),
complete: () => console.log('in subjectPipe complete'),
});
return { subject, pipe };
};
const { subject, pipe } = doWork();
pipe
.pipe(
take(1),
tap(() => console.log('continue the pipe...'))
)
.subscribe({
next: () => console.log('in subject next'),
error: () => console.log('in subject error'),
complete: () => console.log('in subject complete'),
});
subject.next(1);
subject.next(2);
CodePudding user response:
There are few things here:
- A
subject
which is a Subject - A
pipe
in thedoWork
function. Suchpipe
is actually a different object fromsubject
. It is an Observable returned by thepipe
function invoked onsubject
.subject
is the source ofpipe
, but it is notpipe
pipe
errors when processing the first notification, given thework
function logic- A subscription to
pipe
When an Observable errors, than it implicitly completes, so it will not notify any other value.
pipe
errors immediately and therefore it will not notify any more, even if its source, which is subject
, continues to notify.
Therefore you can do subject.next(2)
but this notification will not be processed by pipe
since pipe
has already error
ed.
There is one more thing. There is a take(1)
in the bottom pipe
. This means that we take the first notification and then we complete.
Therefore, even if you remove the logic the throws the error, you will not see a second continue the pipe...
message printed on the console since the Observable completes after the first notification.
CodePudding user response:
I will extend on @Picci's answer:
To combat the fact, that when an emitter (Subject
in this case) errors it cannot emit anymore, we can emit an Error
instance instead. Handling such error will require more non-standard code, but it is definitely an easy to read solution to the given problem.