I have a simple data generator function like this. In summary, I have an array, and I am polling the data from the data generator, validating if the value can be added to the array, and then pushing the value to the array:
function* dataGeneratorFactory() {
while (true) {
for (const v of "abcdefghik") {
yield v;
}
}
}
const shouldExclude = (letter) => letter === "b" || letter === "e";
const items = [];
const maxItemsLength = 10;
const dataGenerator = dataGeneratorFactory();
while (items.length < maxItemsLength) {
const nextItem = dataGenerator.next();
if (shouldExclude(nextItem.value)) continue;
items.push(nextItem.value);
}
console.log(items);
My goal is to translate this logic into rxjs pattern. My only constraint is that the dataGeneratorFactory
function is not modifiable. I came up with this:
// maxItemsLength, dataGenerator, and dataGeneratorFactory are
// from the code snippet above
import { from, map, mergeAll, range, takeWhile } from "rxjs";
const observeable = range(0, maxItemsLength).pipe(
map(() => from(dataGenerator).pipe(takeWhile(shouldExclude))),
mergeAll()
);
observeable.subscribe({
next: (e) => console.log(e),
});
However, I am not seeing anything by the console logger. What did I do wrong in this code?
CodePudding user response:
The take
operator should suffice:
import { filter, from, take, toArray } from "rxjs";
function* dataGeneratorFactory() {
while (true) {
for (const v of "abcdefghik") {
yield v;
}
}
}
const maxItemsLength = 10;
const shouldInclude = (string) => letter === "b" || letter === "e";
from(dataGeneratorFactory())
.pipe(filter(shouldInclude), take(maxItemsLength), toArray())
.subscribe(console.log);
CodePudding user response:
You may want to try something like this (the comments try to explain the logic).
// you start generating a stream of valued from the iterator using the from function
const observeable = from(dataGenerator).pipe(
// then take the first 10 elements of the stream and then complete upstream
take(10),
// filter out the values you do not want to keep
filter((v) => !shouldExclude(v))
);
// eventually you subscribe
observeable.subscribe({
next: (e) => console.log(e),
});
Here a stackblitz wit the full example.