I am trying to run concat
on two observables, however, the second observable in the concat
is not running and I am not sure why. The only thing that happens is done
from the first observable prints out. I have created a simplified version of my code on Stackblitz.
Here is the start of the script, it runs the first observable in the concat but not the second, and the completion message never executes:
function refresh() {
const files = new Files();
const deleteExisting = files.deleteExisting();
const saveFiles = files.query().pipe(tap((i) => console.log(i)));
concat(deleteExisting, saveFiles).subscribe({
complete: () => console.log('Concat done!'),
});
}
refresh();
Next I have a class that queries the database and streams the files one-by-one it checks if the file is on the os if not it adds it to the deletion list (to be deleted later). Once this finished done
is printed out to the console which is expected.
export class Files extends DB {
deleteExisting() {
const toDelete: string[] = [];
return this.stream().pipe(
mergeMap((file) =>
from(this.pathExists(file)).pipe(map((i) => [file, i]))
),
tap<[string, boolean]>(
([file, exists]) => exists === false && toDelete.push(file)
),
toArray(),
map(() => toDelete.length),
finalize(() => {
console.log('done');
})
);
}
// The real code does `fs.pathExists()` from fs-extra which returns a promise
pathExists(path: string) {
return Promise.resolve(true);
}
}
Here we have a fake database class, the actual data comes from sqlite.
- The
stream()
method returns results one-by-one (this is done by the sqlite module) and I relay that to the subject. - The
query()
method returns results as an array, and I relay that to the subject.
export class DB {
private results1 = ['a', 'b', 'c'];
private results2 = [1, 2, 3];
stream() {
const sub = new Subject<string | number>();
// Simulate getting rows one-by-one from the db
setTimeout(() => {
// Runs a db call sqlite supports an `each` function which basically does this:
this.results1.forEach((row) => sub.next(row));
// Once each finishes a callback is called returning the number of results
// This line is simulating that
sub.next(this.results1.length);
});
return sub
.asObservable()
.pipe(takeWhile<string>((i) => typeof i === 'string'));
}
query() {
const sub = new Subject<number[]>();
// Simulate a callback that returns all results from database as an array
setTimeout(() => sub.next(this.results2), 100);
return sub.asObservable().pipe(take(1));
}
}
CodePudding user response:
The problem is in the query()
method.
When subscribing to a plain Subject
, you will only receive the notifications emitted after the moment of the subscription. And in your case your just emitting once in the subject and is way before subscription.
To test it you could use the of
operator to return the values of results2
query() {
return of(this.results2);
}
And you'll see the array gets printed.
Cheers
CodePudding user response:
So the issue was that I was running the query before calling concat
making it execute too soon. Wrapping the query within an Observable so it runs when concat calls it instead of right away fixes the issue:
const saveFiles = new Observable((sub) => {
files.query().subscribe((i) => sub.next(i));
}).pipe(tap((i) => console.log(i)));
defer
also seems to work as well.
const saveFiles = defer(() => files.query())
.pipe(tap((i) => console.log(i)))