Home > Software design >  What is best way of finding array element intersection on rxjs
What is best way of finding array element intersection on rxjs

Time:08-04

I know There is includes Operator on JavaScript Array.

So, Finding common elements from two other Arrays is no Problem. (https://stackoverflow.com/a/1885569/11455650)

Then, What is best way for this on rxjs?

const obs1$ = from(["foo", "bar", "baz", "qux"]);
const obs2$ = from(["bar", "garply", "fred", "foo"];

// const commonIntersection$ = functions or operators...
// result must be ["foo", "bar"]

I think there are two way for implementing this.

Which one is computationally efficient and How can I implement this with rxjs Operator?

  1. merge Two Array and emit only Second value (ignore First Value)
  2. filter each of elements

CodePudding user response:

I assume you want a running intersection from each emission? If so you can either use the scan operator, or roll your own specific intersection operator.

The scan operator is like the reduce method on arrays. In this case, for each element (which is an array of strings) the intersection is returned. Each following emission will work on that last result.

merge(from([["foo", "bar", "baz", "qux"], ["bar", "garply", "fred", "foo"]])).pipe(
  map(x => x),
  scan((acc, cur) => {
    return (!acc) 
      ? next
      : acc.filter(x => cur.includes(x));
  }, undefined as string[] | undefined),
).subscribe(x => console.log(x));

A custom operator will look cleaner, so if you need it multiple times, then go for it! As you can see the logic below is essentially the same as the scan version. It keeps track of the running intersection state in the acc variable, and the current array is used to update it.

merge(from([["foo", "bar", "baz", "qux"], ["bar", "garply", "fred", "foo"]])).pipe(
  map(x => x),
  intersection(),
).subscribe(x => console.log(x));

/*...*/

function intersection<T>(): OperatorFunction<T[], T[]> {
  return (observable: Observable<T[]>) => {
    return new Observable<T[]>((subscriber) => {
      let acc: T[] | undefined; // keeps track of the state.
      const subscription = observable.subscribe({
        next: (cur) => {
          acc = (!acc) 
            ? cur
            : acc.filter(x => cur.includes(x));
          subscriber.next(acc);
        },
        error: (err) => subscriber.error(err),
        complete: () => subscriber.complete(),
      });
      return () => subscription.unsubscribe();
    })
  }
}

CodePudding user response:

import { forkJoin, from } from 'rxjs';
import { toArray } from 'rxjs/operators';
import { intersection } from 'lodash';

const obs1$ = from(['foo', 'bar', 'baz', 'qux']);
const obs2$ = from(['bar', 'garply', 'fred', 'foo']);

forkJoin([obs1$.pipe(toArray()), obs2$.pipe(toArray())]).subscribe(
  ([arr1, arr2]) => console.log(intersection(arr1, arr2))
);

Try here: https://stackblitz.com/edit/rxjs-6jdyzy?file=index.ts

  • Related