Home > Blockchain >  In RXJS can you process data passed to a subject.next() before re-emitting it?
In RXJS can you process data passed to a subject.next() before re-emitting it?

Time:04-12

I have a feeling that this question is going to expose some serious naivety surrounding my understanding of observables but at the risk of looking foolish let me ask it anyway.

I would like to create an RXJS subject which does some operations on data passed into it before re-emitting it. Let me explain in longhand using a simplistic example.

e.g. rolling distinctUntilChanged into the source observable

In the code below, I have one observable which is updated every time that an event happens that could alter the window's active state.

However in order to avoid emitting duplicate values I have a second observable which filters only distinct values before emitting them.


export class MyService {
  activeValue$ = new Subject<boolean>()
  active$: Observable<boolean>

  constructor() {

    // setup an observable that only emits *changed* 
    // values of the active value
    this.active$ = this.activeValue$.pipe(
      distinctUntilChanged()
    )

    // Listen to multiple cues for things that might alter active state
    window?.addEventListener('focus', () => this.setActiveValue(true))
    window?.addEventListener('click', () => this.setActiveValue(true))
    window?.addEventListener('hide', () => this.setActiveValue(false))
    window?.addEventListener('blur', () => this.setActiveValue(false))

  }
} 

Can I combine the two observables into a single observable?

This example is so simple as to be trivial but it illustrates the goal which is to pre-process the data going into an observable via .next() before re-emitting it.

I would ideally like to be able to write code that looks something like:


export class MyService {

  active$: Subject<boolean>

  constructor() {

    // This (for many reasons) doesn't work but it kind of shows what 
    // I'm hoping for...
    this.active$ = new Subject<boolean>().pipe(
      distinctUntilChanged()
    )

    // Listen to multiple cues for things that might alter active state
    ...

  }
} 

CodePudding user response:

I don't think there is a beautiful solution for this. One thing you could do is to have a second "staging" Subject. You subscribe to this Observable and add any kind of transformations or filters inside this pipe. Inside subscribe you could then push the values directly to $active.

let active$ = new Subject()
let staging = new Subject()
staging.pipe(distinctUntilChanged()).subscribe(val => {
  active$.next(val)
})


active$.subscribe(console.log)

But I think it would be better to just use Observables. Create a source$ Subject and another Observable.

let source$ = new Subject()
let active$ = source$.pipe(distinctUntilChanged()).subscribe(val => {
  active$.next(val)
})

Afterwards use active$ everywhere to subscribe and source$ to push new values.

CodePudding user response:

What you've done in your first example code is the typical way to achieve this:

  • define a private subject for pushing values
  • spin off a public observable to apply any "clean up" logic
  • consumers access the public observable to receive the "clean" emissions

Can I combine the two observables into a single observable?

From your desired code:

    this.active$ = new Subject<boolean>().pipe(
      distinctUntilChanged()
    )

This could work... BUT, you don't maintain a reference to the subject, so you aren't able to push values (subject.next()). So you do need both a subject and an observable.

You might find it cleaner to initialize your observable outside the constructor as you are doing with your subject:

export class MyService {
  private activeValue$ = new Subject<boolean>()
  public active$ = this.activeValue$.pipe(distinctUntilChanged())

  constructor() {
    // Listen to multiple cues for things that might alter active state
    window?.addEventListener('focus', () => this.setActiveValue(true))
    window?.addEventListener('click', () => this.setActiveValue(true))
    window?.addEventListener('hide', () => this.setActiveValue(false))
    window?.addEventListener('blur', () => this.setActiveValue(false))
  }

  setActiveValue(value: boolean) {
    this.activeValue$.next(value)
  }
} 

Now... The above assumes you actually need a subject to "manually" push values through. If you can get your events via other observables, you don't need a subject at all. In the case of events, you may choose to use fromEvent:

export class MyService {

  public active$ = merge(
     fromEvent(window, 'focus').pipe(mapTo(true)),
     fromEvent(window, 'click').pipe(mapTo(true)),
     fromEvent(window, 'hide').pipe(mapTo(false)),
     fromEvent(window, 'blur').pipe(mapTo(false)),
  ).pipe(
    distinctUntilChanged(),
    shareReplay(1) // add this if you want subscribers to receive the
  )                // most recent emission upon subscription which
                   // is often desired in angular services
} 
  • Related