I have a feeling that this question is going to expose some serious naivety surrounding my understanding of observables but at the risk of looking foolish let me ask it anyway.
I would like to create an RXJS subject which does some operations on data passed into it before re-emitting it. Let me explain in longhand using a simplistic example.
e.g. rolling distinctUntilChanged
into the source observable
In the code below, I have one observable which is updated every time that an event happens that could alter the window's active
state.
However in order to avoid emitting duplicate values I have a second observable which filters only distinct values before emitting them.
export class MyService {
activeValue$ = new Subject<boolean>()
active$: Observable<boolean>
constructor() {
// setup an observable that only emits *changed*
// values of the active value
this.active$ = this.activeValue$.pipe(
distinctUntilChanged()
)
// Listen to multiple cues for things that might alter active state
window?.addEventListener('focus', () => this.setActiveValue(true))
window?.addEventListener('click', () => this.setActiveValue(true))
window?.addEventListener('hide', () => this.setActiveValue(false))
window?.addEventListener('blur', () => this.setActiveValue(false))
}
}
Can I combine the two observables into a single observable?
This example is so simple as to be trivial but it illustrates the goal which is to pre-process the data going into an observable via .next()
before re-emitting it.
I would ideally like to be able to write code that looks something like:
export class MyService {
active$: Subject<boolean>
constructor() {
// This (for many reasons) doesn't work but it kind of shows what
// I'm hoping for...
this.active$ = new Subject<boolean>().pipe(
distinctUntilChanged()
)
// Listen to multiple cues for things that might alter active state
...
}
}
CodePudding user response:
I don't think there is a beautiful solution for this. One thing you could do is to have a second "staging" Subject
. You subscribe to this Observable and add any kind of transformations or filters inside this pipe. Inside subscribe
you could then push the values directly to $active
.
let active$ = new Subject()
let staging = new Subject()
staging.pipe(distinctUntilChanged()).subscribe(val => {
active$.next(val)
})
active$.subscribe(console.log)
But I think it would be better to just use Observables. Create a source$
Subject and another Observable.
let source$ = new Subject()
let active$ = source$.pipe(distinctUntilChanged()).subscribe(val => {
active$.next(val)
})
Afterwards use active$
everywhere to subscribe and source$
to push new values.
CodePudding user response:
What you've done in your first example code is the typical way to achieve this:
- define a private subject for pushing values
- spin off a public observable to apply any "clean up" logic
- consumers access the public observable to receive the "clean" emissions
Can I combine the two observables into a single observable?
From your desired code:
this.active$ = new Subject<boolean>().pipe(
distinctUntilChanged()
)
This could work... BUT, you don't maintain a reference to the subject, so you aren't able to push values (subject.next()). So you do need both a subject and an observable.
You might find it cleaner to initialize your observable outside the constructor as you are doing with your subject:
export class MyService {
private activeValue$ = new Subject<boolean>()
public active$ = this.activeValue$.pipe(distinctUntilChanged())
constructor() {
// Listen to multiple cues for things that might alter active state
window?.addEventListener('focus', () => this.setActiveValue(true))
window?.addEventListener('click', () => this.setActiveValue(true))
window?.addEventListener('hide', () => this.setActiveValue(false))
window?.addEventListener('blur', () => this.setActiveValue(false))
}
setActiveValue(value: boolean) {
this.activeValue$.next(value)
}
}
Now... The above assumes you actually need a subject to "manually" push values through. If you can get your events via other observables, you don't need a subject at all. In the case of events, you may choose to use fromEvent
:
export class MyService {
public active$ = merge(
fromEvent(window, 'focus').pipe(mapTo(true)),
fromEvent(window, 'click').pipe(mapTo(true)),
fromEvent(window, 'hide').pipe(mapTo(false)),
fromEvent(window, 'blur').pipe(mapTo(false)),
).pipe(
distinctUntilChanged(),
shareReplay(1) // add this if you want subscribers to receive the
) // most recent emission upon subscription which
// is often desired in angular services
}