Home > Enterprise >  observable from array with mergMap: finalize never called
observable from array with mergMap: finalize never called

Time:08-25

I have the following code:

// code is in a  state class for ngxs
const someObject = ctx.getState().someObject;
const sourceArray: SomeType[] = [/* content */];

return from(sourceArray).pipe(
  tap((arrayElem) => ctx.patchState({currentElem: arrayElem})),
  switchMap((arrayElem) => {
    return this.someService.someMethod(arrayElem);
  }),
  catchError((err) => {
    return of(err);
  }),
  finalize(() => {
    console.log("Test"); // Never called
  })
);


someMethod(request) {
    return this.someClientAsync().pipe(
      switchMap((theClient) => {
        return new Observable<ReturnType>(subscriber => {
            theClient.makeCall(request, (err, data) => {
            if (err) {
              subscriber.error(err)
            }
            subscriber.next(data);
          });
        });
      })
    );
  }

The problem is: the code in the switchMap operator is called correctly, for every array element, but the finalize method is never called.

It may be due to the code in someMethod: There, I had to wrap code that uses callbacks into an observable. The methods that the object returned by someClientAsync() has this callback-style API.

But I am not sure. The way I wrapped the callback into the observable works for other methods in that service. And as I wrote, switchMap is actually called n times, n being the array length.

Thanks for any help!

CodePudding user response:

According to the rxjs docs (https://rxjs.dev/api/operators/finalize), finalize is called when the observable errors or completes.

In your situation, you are catching all errors, therefore the observable never errors. From your example it is unclear if this.someService.someMethod(arrayElem); ever completes, but if it is not, the resulting observable will never complete. Therefore, the function inside finalize is never called.

What to do to solve it?

Again, it is unclear from your example what you are trying to achieve. However, I will make some assumptions.

  • If you only want the first value of this.someService.someMethod(arrayElem); and then let it complete, you can do:
switchMap((arrayElem) => {
    return this.someService.someMethod(arrayElem).pipe(take(1));
  })
  • If you want to enforce that this.someService.someMethod(arrayElem); emits a value within some time (eg: you want to recover from an http request that did not receive a response), you can use:
from(sourceArray).pipe(
  switchMap((arrayElem) => this.someService.someMethod(arrayElem).pipe(take(1))),
  timeout(timeoutTimeInMs),
  finalize(() => console.log("Test")) // Certainly called
);
  • If you do not want to enforce a value, but you can emit a dummy value if the inner observable does not emit (eg: An http request for which you cannot wait to long but you do not want to throw an error if the request takes a long time):
from(sourceArray).pipe(
  switchMap((arrayElem) => this.someService.someMethod(arrayElem).pipe(
    take(1), 
    timeout(timeoutTimeInMs),
    catchError(() => of(dummyValue)) 
  )),
  finalize(() => console.log("Test")) // Certainly called
);

Btw: switchMap is similar, but not the same as mergeMap. If switchMap receives a second value before the inner observable of the first value completes, it will dispose the first inner observable and subscribe to the second observable. mergeMap will subscribe to both observables in the same situation. concatMap will wait for the first one to complete and then subscribe to the second observable.

CodePudding user response:

Can you change the some method so that it just resolves with the callback like so? The Observable wrapper is not necessary if makeCall returns an observable!

someMethod(request) {
    return this.someClientAsync().pipe(
      switchMap((theClient) => {
        return theClient.makeCall(request);
      })
    );
  }

CodePudding user response:

finalize() is a dispose handler that is called when a chain is being disposed. This happens after the chain is completed with a complete notification or when the chain is unsubscribed.

In your case you never complete the merged Observable:

return new Observable<ReturnType>(subscriber => {
  theClient.makeCall(request, (err, data) => {
    if (err) {
      subscriber.error(err)
    }
    subscriber.next(data);
  });
});

This is actually creating a memory leak. You need to properly complete the Observable if you're done emitting.

return new Observable<ReturnType>(subscriber => {
  theClient.makeCall(request, (err, data) => {
    if (err) {
      subscriber.error(err);
      return;
    }
    subscriber.next(data);
    subscriber.complete(); // <---
  });
});

Btw, ideally you should also return a dispose function from new Observable() to cancel the request created with theClient.makeCall(request, ...) but maybe it's not possible in your case.

  • Related