Home > Software design >  rxjs take until accumulated array length is size N
rxjs take until accumulated array length is size N

Time:12-21

I have a simple data generator function like this. In summary, I have an array, and I am polling the data from the data generator, validating if the value can be added to the array, and then pushing the value to the array:

function* dataGeneratorFactory() {
    while (true) {
        for (const v of "abcdefghik") {
            yield v;
        }
    }
}

const shouldExclude = (letter) => letter === "b" || letter === "e";
const items = [];
const maxItemsLength = 10;
const dataGenerator = dataGeneratorFactory();

while (items.length < maxItemsLength) {
    const nextItem = dataGenerator.next();
    if (shouldExclude(nextItem.value)) continue;
    items.push(nextItem.value);
}

console.log(items);

My goal is to translate this logic into rxjs pattern. My only constraint is that the dataGeneratorFactory function is not modifiable. I came up with this:

// maxItemsLength, dataGenerator, and dataGeneratorFactory are
// from the code snippet above
import { from, map, mergeAll, range, takeWhile } from "rxjs";

const observeable = range(0, maxItemsLength).pipe(
    map(() => from(dataGenerator).pipe(takeWhile(shouldExclude))),
    mergeAll()
);

observeable.subscribe({
    next: (e) => console.log(e),
});

However, I am not seeing anything by the console logger. What did I do wrong in this code?

CodePudding user response:

The take operator should suffice:

import { filter, from, take, toArray } from "rxjs";

function* dataGeneratorFactory() {
  while (true) {
    for (const v of "abcdefghik") {
      yield v;
    }
  }
}

const maxItemsLength = 10;
const shouldInclude = (string) => letter === "b" || letter === "e";

from(dataGeneratorFactory())
  .pipe(filter(shouldInclude), take(maxItemsLength), toArray())
  .subscribe(console.log);

CodePudding user response:

You may want to try something like this (the comments try to explain the logic).

// you start generating a stream of valued from the iterator using the from function
const observeable = from(dataGenerator).pipe(
  // then take the first 10 elements of the stream and then complete upstream
  take(10),
  // filter out the values you do not want to keep
  filter((v) => !shouldExclude(v))
);

// eventually you subscribe
observeable.subscribe({
  next: (e) => console.log(e),
});

Here a stackblitz wit the full example.

  • Related