Home > Back-end >  Rxjs stream of arrays to a single value and repeat at the end of the stream
Rxjs stream of arrays to a single value and repeat at the end of the stream

Time:01-27

I have an observable that fetches an array of items (32 each time) from an API and emits a new response until there are no items left to fetch.

I want to process said list of items one by one as soon as i get the first batch until im done with ALL items fetched.

When i'm done with the complete list, i want to repeat the process indefinitely.

Here's what i have so far:

_dataService
      .getItemsObservable()
      .pipe(
        switchMap((items) => {
          const itemList = items.map((i) => i.itemId);
          return of(itemList);
        }),
        concatMap((item) =>
            from(item).pipe(
              concatMap((item) => {
                   // do something here
                }
              )
            )
         ),
         repeat()
      ).subscribe()

Any idea on what can i do? Right now what happens is it will loop over the first batch of items and ignore the rest

CodePudding user response:

Replay wont call the service again, it will reuse the original values. Try switchMap from a behaviour subject and make it emit after you have processed the values. Really not sure why you would turn each item into an observable to concatMap. Just process the items after they are emitted.

const { of, BehaviorSubject, switchMap, delay } = rxjs;

const _dataService = {
  getItemsObservable: () => of(Array.from({ length: 32 }, () => Math.random()))
};

const bs$ = new BehaviorSubject();

bs$.pipe(
  delay(1000), // Behavior subject are synchronous will cause a stack overflow 
  switchMap(() => _dataService.getItemsObservable())
).subscribe(values => {
  values.forEach(val => {
    console.log('Doing stuff with '   val);
  });
  bs$.next();
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN h86koeB7JhY4/2YeyA5l rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>

CodePudding user response:

I have an observable that fetches an array of items (32 each time) from an API and emits a new response until there are no items left to fetch.

Okay, I assume that is _dataService.getItemsObservable()?

I want to process said list of items

What does this mean? Process how? Lets assume you have some function called processItemById that processes an itemId and returns the processed item.

one by one as soon as i get the first batch until im done with ALL items fetched.

Sounds like you're turning an Observable<T[]> into an Observable<T>. You can use mergeMap (don't care about order) or concatMap (maintain order) to do this. Since you're just flattening an inner array, they'll be the same in this case.

_dataService.getItemsObservable().pipe(
  mergeMap(v => v),
  map(item => processItemById(item.itemId)),
  // Repeat here doesn't call `getItemsObservable()` again,
  // instead it re-subscribes to the observable that was returned.
  // Hopefully that's what you're counting on. It's not clear to me
  repeat()
).subscribe(processedItemOutput => {
  // Do something with the output?
});

Any idea on what can i do?

From your explanation and code, it's not clear what you're trying to do. Maybe this helps.

Right now what happens is it will loop over the first batch of items and ignore the rest

This could happen for a number of reasons.


Tip 1

Using higher-order mapping operators with RxJS::of is a code smell. Just use a regular map instead.

for example:

concatMap(v => of(fn(v)))
// or
switchMap(v => of(fn(v)))

are the same as:

map(v => fn(v))

Tip 2

I have no idea if this would help you but you can generate a new observable on each subscribe by using the delay operator.

For example:

defer(() => _dataService.getItemsObservable()).pipe(
  mergeMap(v => v),
  map(item => processItemById(item.itemId)),
  repeat()
).subscribe(processedItemOutput => {
  // Do something with the output?
});
  • Related