Home > Software design >  First argument of concat doesn't seem to be completing
First argument of concat doesn't seem to be completing

Time:04-01

I am trying to run concat on two observables, however, the second observable in the concat is not running and I am not sure why. The only thing that happens is done from the first observable prints out. I have created a simplified version of my code on Stackblitz.

Here is the start of the script, it runs the first observable in the concat but not the second, and the completion message never executes:

function refresh() {
  const files = new Files();
  const deleteExisting = files.deleteExisting();
  const saveFiles = files.query().pipe(tap((i) => console.log(i)));

  concat(deleteExisting, saveFiles).subscribe({
    complete: () => console.log('Concat done!'),
  });
}

refresh();

Next I have a class that queries the database and streams the files one-by-one it checks if the file is on the os if not it adds it to the deletion list (to be deleted later). Once this finished done is printed out to the console which is expected.

export class Files extends DB {
  deleteExisting() {
    const toDelete: string[] = [];
    return this.stream().pipe(
      mergeMap((file) =>
        from(this.pathExists(file)).pipe(map((i) => [file, i]))
      ),
      tap<[string, boolean]>(
        ([file, exists]) => exists === false && toDelete.push(file)
      ),
      toArray(),
      map(() => toDelete.length),
      finalize(() => {
        console.log('done');
      })
    );
  }

  // The real code does `fs.pathExists()` from fs-extra which returns a promise
  pathExists(path: string) {
    return Promise.resolve(true);
  }
}

Here we have a fake database class, the actual data comes from sqlite.

  • The stream() method returns results one-by-one (this is done by the sqlite module) and I relay that to the subject.
  • The query() method returns results as an array, and I relay that to the subject.
export class DB {
  private results1 = ['a', 'b', 'c'];
  private results2 = [1, 2, 3];

  stream() {
    const sub = new Subject<string | number>();
    // Simulate getting rows one-by-one from the db
    setTimeout(() => {
      // Runs a db call sqlite supports an `each` function which basically does this:
      this.results1.forEach((row) => sub.next(row));
      // Once each finishes a callback is called returning the number of results
      // This line is simulating that
      sub.next(this.results1.length);
    });
    return sub
      .asObservable()
      .pipe(takeWhile<string>((i) => typeof i === 'string'));
  }

  query() {
    const sub = new Subject<number[]>();
    // Simulate a callback that returns all results from database as an array
    setTimeout(() => sub.next(this.results2), 100);
    return sub.asObservable().pipe(take(1));
  }
}

CodePudding user response:

The problem is in the query() method.

When subscribing to a plain Subject, you will only receive the notifications emitted after the moment of the subscription. And in your case your just emitting once in the subject and is way before subscription.

To test it you could use the of operator to return the values of results2

query() {
   return of(this.results2);
}

And you'll see the array gets printed.

Cheers

CodePudding user response:

So the issue was that I was running the query before calling concat making it execute too soon. Wrapping the query within an Observable so it runs when concat calls it instead of right away fixes the issue:

  const saveFiles = new Observable((sub) => {
    files.query().subscribe((i) => sub.next(i));
  }).pipe(tap((i) => console.log(i)));

defer also seems to work as well.

  const saveFiles = defer(() => files.query())
    .pipe(tap((i) => console.log(i)))
  • Related