I have a validation process which validates data in the table row by row. Because each row validation uses a shared resource, access to it must be serialized.
public validate():Observable<boolean>{
const rowValidations:Observable<boolean>[] = dataRows.map(row=>this.validateSingleRow(row);
return forkJoin(...rowValidations).pipe(
map(results=>results.every(r=>r))
)
}
If I understand correctly, forkJoin
will not wait for each observable to finish before subscribing to the next one like concat
would so that will probably fail. concat
on the other hand serializes all the observables into a single stream.
How can I get a subscription order like with concat
but have an array of results of each observable like with forkJoin
effectively synchronizing execution of each inner observable (like Javas synchronzied validateSingleRow
)?
CodePudding user response:
Would something like this do the trick for you?
class SomeClass {
dataRows = [1, 2, 3];
public validate(): Observable<boolean[]> {
return this.resolveSequentially(this.dataRows);
}
private validateSequentially<T>([cur, ...obs]: T[]): Observable<boolean[]> {
return cur
? this.validateSingleRow(cur).pipe(
switchMap((x) =>
this.validateSequentially(obs).pipe(map((arr) => [x, ...arr]))
)
)
: of([]);
}
// Mock
private validateSingleRow(cur: any) {
console.log(`Validating ${cur}...`);
return of(Math.floor(Math.random() * 2) === 1).pipe(
delay(1000),
tap((x) => console.log(`Result: ${x}`))
);
}
}
const obj = new SomeClass();
obj.validate().subscribe(console.log);
CodePudding user response:
Actually, if you know that each this.validateSingleRow(row)
will always emit only once you can use toArray()
:
concat(...rowValidations).pipe(
toArray(),
);
concat
will guarantee correct order and toArray()
will collect all emissions into a single array and reemit it after the source Observable completes.
Otherwise, if validateSingleRow
might emit multiple times and you always want only its last value you could use scan
:
const indexedRowValidations = rowValidations.map((o, index) => o.pipe(
map(result => [index, result]),
));
concat(...indexedRowValidations ).pipe(
scan((acc, [index, result]) => {
acc[index] = result;
return acc;
}, {}),
takeLast(1),
);
(I didn't test it but I believe you get the idea :)).
CodePudding user response:
Solution that meets my requirement is simpler than one might think. I have used concat
with toArray()
like this
const rowValidations:Observable<boolean>[] = dataRows.map(row=>defer(()=>this.validateSingleRow(row));
return concat(...rowValidations).pipe(
toArray(),
map(results=>results.every(r=>r))
)
so validateSingleRow
is executed one by one and toArray
transforms boolean stream into array of boolean.