Home > Back-end >  RxJS concat only emit on complete / the missing RxJS contactJoin
RxJS concat only emit on complete / the missing RxJS contactJoin

Time:08-24

I've got an array of Observables and I'm trying to do 2 things:

(1) the subscribe block's next function should only be called once, when everything is done, with an array of all results.

(2) the Observables must be fired in sequence, not all at the same time.

Here's my code so far, where (1) is NOT working and (2) is working:

const nums = [1, 2, 3];

const obs$ = nums.map((num) => {
  return rxjs.defer(() => {
    // this is an async op, mimicking an HTTP call
    console.log('firing', num);
    return rxjs.from(
      new Promise((resolve) => setTimeout(() => resolve(num), 2000))
    );
  });
});

rxjs.concat(...obs$).pipe(
  // do something here?
).subscribe(
  num => { console.log('next', num) },
  err => { console.error('error', err.message) },
  () => { console.log('complete') }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

Actual output:

at 0 secs ... firing 1
at 2 secs ... next 1
at 2 secs ... firing 2
at 4 secs ... next 2
at 4 secs ... firing 3
at 6 secs ... next 3
at 6 secs ... complete

Desired output:

at 0 secs ... firing 1
at 2 secs ... firing 2
at 4 secs ... firing 3
at 6 secs ... next [1, 2, 3]
at 6 secs ... complete

Note the timings are important here.

Now I know forkJoin combines results like this. The problem with forkJoin is that firing 1 firing 2 and firing 3 all happen at the same time but I need them to fire sequentially, which is why I am using concat.

How can I achieve the desired output whilst preserving the above desired timings for firing 1 firing 2 and firing 3?

CodePudding user response:

Use toArray inside the pipe of concat it will execute each one and then call next after all are done!

const nums = [1, 2, 3];

const obs$ = nums.map((num) => {
  return rxjs.defer(() => {
    // this is an async op, mimicking an HTTP call
    console.log('firing', num);
    return rxjs.from(
      new Promise((resolve) => setTimeout(() => resolve(num), 500))
    );
  });
});

let output = [];
rxjs.concat(...obs$).pipe(
  rxjs.toArray(),
).subscribe(
  num => {
    console.log('next', num)
  },
  err => {
    console.error('error', err.message)
  },
  () => {
    console.log('complete')
  }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

  • Related