Home > Blockchain >  How to run subject after error happens in rxjs?
How to run subject after error happens in rxjs?

Time:05-12

Why subject.next(2); not works? I know it may happens because the signal stream got error. But I add catchError and I return a EMPTY in the pipeline and it make it complete.

So seems catchError not helping me either.

How to make subject.next(2) works?

My code is calling to doWork function. inside I invoke work function. if it's one I simulate an error at the first time I call the function, after that should not have error.

Then I expose the subject and the subject-pipeline to do something else after the pipeline is invoked. I use share for that (to not duplicate my flow) and it should be happens only once.

stackblitz.com

import { EMPTY, of, Subject, throwError } from 'rxjs';
import { exhaustMap, share, take, tap } from 'rxjs/operators';

console.clear();

const doWork = () => {
  const subject = new Subject();

  const work = (n) => {
    console.log('in work fn', n);
    if (n === 1) {
      throw new Error('bla');
    }
    return of([]);
  };

  const pipe = subject.pipe(
    exhaustMap((n) => work(n)),
    share()
  );

  pipe.subscribe({
    next: () => console.log('in subjectPipe next'),
    error: () => console.log('in subjectPipe error'),
    complete: () => console.log('in subjectPipe complete'),
  });

  return { subject, pipe };
};

const { subject, pipe } = doWork();

pipe
  .pipe(
    take(1),
    tap(() => console.log('continue the pipe...'))
  )
  .subscribe({
    next: () => console.log('in subject next'),
    error: () => console.log('in subject error'),
    complete: () => console.log('in subject complete'),
  });

subject.next(1);
subject.next(2);

CodePudding user response:

There are few things here:

  1. A subject which is a Subject
  2. A pipe in the doWork function. Such pipe is actually a different object from subject. It is an Observable returned by the pipe function invoked on subject. subject is the source of pipe, but it is not pipe
  3. pipe errors when processing the first notification, given the work function logic
  4. A subscription to pipe

When an Observable errors, than it implicitly completes, so it will not notify any other value.

pipe errors immediately and therefore it will not notify any more, even if its source, which is subject, continues to notify.

Therefore you can do subject.next(2) but this notification will not be processed by pipe since pipe has already errored.

There is one more thing. There is a take(1) in the bottom pipe. This means that we take the first notification and then we complete.

Therefore, even if you remove the logic the throws the error, you will not see a second continue the pipe... message printed on the console since the Observable completes after the first notification.

CodePudding user response:

I will extend on @Picci's answer:

To combat the fact, that when an emitter (Subject in this case) errors it cannot emit anymore, we can emit an Error instance instead. Handling such error will require more non-standard code, but it is definitely an easy to read solution to the given problem.

  • Related