Home > Software design >  How to serialize execution of array of observables
How to serialize execution of array of observables

Time:12-07

I have a validation process which validates data in the table row by row. Because each row validation uses a shared resource, access to it must be serialized.

public validate():Observable<boolean>{
    const rowValidations:Observable<boolean>[] = dataRows.map(row=>this.validateSingleRow(row);
    return forkJoin(...rowValidations).pipe(
      map(results=>results.every(r=>r))
    )
}

If I understand correctly, forkJoin will not wait for each observable to finish before subscribing to the next one like concat would so that will probably fail. concat on the other hand serializes all the observables into a single stream.

How can I get a subscription order like with concat but have an array of results of each observable like with forkJoin effectively synchronizing execution of each inner observable (like Javas synchronzied validateSingleRow)?

CodePudding user response:

Would something like this do the trick for you?

class SomeClass {
  dataRows = [1, 2, 3];

  public validate(): Observable<boolean[]> {
    return this.resolveSequentially(this.dataRows);
  }

  private validateSequentially<T>([cur, ...obs]: T[]): Observable<boolean[]> {
    return cur
      ? this.validateSingleRow(cur).pipe(
          switchMap((x) =>
            this.validateSequentially(obs).pipe(map((arr) => [x, ...arr]))
          )
        )
      : of([]);
  }

  // Mock
  private validateSingleRow(cur: any) {
    console.log(`Validating ${cur}...`);
    return of(Math.floor(Math.random() * 2) === 1).pipe(
      delay(1000),
      tap((x) => console.log(`Result: ${x}`))
    );
  }
}

const obj = new SomeClass();

obj.validate().subscribe(console.log);

StackBlitz demo

CodePudding user response:

Actually, if you know that each this.validateSingleRow(row) will always emit only once you can use toArray():

concat(...rowValidations).pipe(
  toArray(),
);

concat will guarantee correct order and toArray() will collect all emissions into a single array and reemit it after the source Observable completes.

Otherwise, if validateSingleRow might emit multiple times and you always want only its last value you could use scan:

const indexedRowValidations = rowValidations.map((o, index) => o.pipe(
  map(result => [index, result]),
));

concat(...indexedRowValidations ).pipe(
  scan((acc, [index, result]) => {
    acc[index] = result;
    return acc;
  }, {}),
  takeLast(1),
);

(I didn't test it but I believe you get the idea :)).

CodePudding user response:

Solution that meets my requirement is simpler than one might think. I have used concat with toArray() like this

const rowValidations:Observable<boolean>[] = dataRows.map(row=>defer(()=>this.validateSingleRow(row));
return concat(...rowValidations).pipe(
  toArray(),
  map(results=>results.every(r=>r))
)

so validateSingleRow is executed one by one and toArray transforms boolean stream into array of boolean.

  • Related