Home > Software engineering >  RxJS Waits until Subscribe Observable Finish
RxJS Waits until Subscribe Observable Finish

Time:09-24

I'm trying to implement gRPC with NestJS and use RxJS Observerable for handling server streaming.

On the code below I try to put value from observable to results array. The function findAllRepos return empty list while the console.log(value) in subscribe print all the result correctly.

  findAllRepos(): Repository[] {
    const results: Repository[] = [];

    const observable = this.mainService.findAllRepos({});
    observable.subscribe({
      next: (value) => {
        console.log(value);
        results.push(value);
      },
      error: (err) => console.log(err),
      complete: () => console.log(results)
    });

    return results;
  }

I think the problem is that the function returns value before subscribe finish. Is there any solution to solve this? Thanks!

CodePudding user response:

The Problem

I think the problem is that the function returns value before subscribe finish.

The functions (next, error, & complete) are all called by the observable (not by you) some time in the future. When you dig into the gut of RxJS you can gain some control over how these functions are called, but to keep things simple it's best to imagine that this is handled opaquely by the RxJS library.

As such, there's no way to do what you want.

The problem is even worse than that though. The problem you encountered exists with any interaction between synchronous and asynchronous code. You cannot run asynchronous code synchronously. In a single-threaded environment (such as JavaScript) trying to get a synchronous piece of code to wait will deadlock the entire program immediately.

Consider the following code: You may expect this to output "a equals 0" for 1000ms, and the start outputting "a equals 1" forever thereafter. What actually happens, however, is that the code inside the setTimeout will never get a chance to run since the thread will be stuck in an infinite loop printing "a equals 0"

// Start with a = 0
let a = 0;
// After 1 second, set a = 1
setTimeout(() => a = 1, 1000);

// loop forever and print something based on value of a
while(true){ 
  if(a == 0) console.log("a equals 0");
  else console.log("a equals 1");
}

The Solution

The two most popular ways to manage asynchronous code is through promises or observables. If you want a function to return something asynchronously, then have it return a promise or an observable.

In your case:

findAllRepos(): Observable<Repository[]> {
  
  const observable = this.mainService.findAllRepos({});
  return observable.pipe(
    tap(value => console.log("Repository value being added to array: ", value)),
    toArray(),
    tap({
      next: value => console.log("Result Array (Repository[]) : ", value),
      error: console.log,
      complete: () => console.log("findAllRepos observable complete")
    })
  );

}

Then to get the actual value elsewhere:

findAllRepos().subscribe(repositories => {
  /* Do something with your array of repositories */
});

CodePudding user response:

It's not very good practice to have methods with subscriptions, You'll make reactive code non reactive at all. Why not use sth like

findAllRepos(): Observable<Repository[]> {
    return this.mainService.findAllRepos({}).pipe(catchError((e) => {
        console.log('e ', e);
    }), finalize(() => {
        console.log('completed');
    }))
}

and then findAllRepos().subscribe in client ? You'll have more flexibility in this method, You can call this method from multiple places and map the result however you want per use case.

  • Related