Home > other >  RxJS concat not executing observables sequentially
RxJS concat not executing observables sequentially

Time:08-20

I have 3 RxJS observables which I want to execute sequentially. The 2nd Observable should not start executing until the 1st Observable has finished.

Here's my simplified RxJS code.

const nums = [1, 2, 3]

const obs$ = nums.map(num => {
  const promise = new Promise(resolve => setTimeout(() => resolve(num), 2000))
  return rxjs.from(promise)
})

rxjs.concat(...obs$).subscribe(
  num => { console.log('next', num) },
  err => { console.error('error', err.message) },
  () => { console.log('complete') }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

This is the expected output with expected timings:

next 1 (at 2 secs)
next 2 (at 4 secs)
next 3 (at 6 secs)
complete (at 6 secs)

This is the actual output with actual timings:

next 1 (at 2 secs)
next 2 (at 2 secs)
next 3 (at 2 secs)
complete (at 2 secs)

As you can see, the timings are not as expected. How do I correct this so that the Observables execute sequentially? Note that there could be X number of Observables such that a solution for exactly 3 Observables is not acceptable.

Note the setTimeout in my simplified code is not there in real life. This is just mimicking an async op (i.e. an HTTP call).

Many thanks for your help.

CodePudding user response:

So, the issue at hand is actually about the Observable. Concat works as expected by subscribing in order. We only need to create the Observable in a way which will only fire the HTTP call (or any other code) when it is subscribed to. The defer function help with that: https://rxjs.dev/api/index/function/defer

When we use Angular's HTTP Client, then this is not an issue, since it also defers the execution of the call until the actual subscription.

To demonstrate the correct order of execution, I added another console.log which inside the Observable.

const nums = [1, 2, 3]

const obs$ = nums.map(num => {
  return rxjs.defer(() => {
      console.log('order', num);
      return rxjs.from(new Promise(resolve => setTimeout(() => resolve(num), 2000 - 500 * num)))
  })
})

rxjs.concat(...obs$).subscribe(
  num => { console.log('next', num) },
  err => { console.error('error', err.message) },
  () => { console.log('complete') }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

CodePudding user response:

Using a promise in this way seems to behave similar to debounce https://rxjs.dev/api/operators/debounce

Piping through a delay seems to do what you want, though. https://rxjs.dev/api/index/function/delay

const nums = [1, 2, 3]

const obs$ = nums.map(num => {
  return rxjs.of(num).pipe(rxjs.delay(2000))
})

rxjs.concat(...obs$).subscribe(
  num => { console.log('next', num) },
  err => { console.error('error', err.message) },
  () => { console.log('complete') }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

CodePudding user response:

You can use the index in the map as a second parameter and multiply it with the setTimeout timer.

You can do like:

const nums = [1, 2, 3]

const obs$ = nums.map((num, i) => {
  const promise = new Promise(resolve => setTimeout(() => resolve(num), 2000*i))
  return rxjs.from(promise)
})

rxjs.concat(...obs$).subscribe(
  num => { console.log('next', num) },
  err => { console.error('error', err.message) },
  () => { console.log('complete') }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

Another one is:

const nums = [1, 2, 3]

const obs$ = nums.map((num, i) => {
  const promise = new Promise(resolve => resolve(num))
  return rxjs.from(promise).pipe(rxjs.delay(2000))
})

rxjs.concat(...obs$).subscribe(
  num => { console.log('next', num) },
  err => { console.error('error', err.message) },
  () => { console.log('complete') }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

  • Related