I have some Rxjs streams that indicates changes:
reportChanges$
objectPropChanges$
textPropsChanges$
The first two have default values, the latest is not.
I tried to collect all changes and store them:
this.changes$ = combineLatest([
this.reportChanges$,
this.objectPropChanges$,
this.textPropsChanges$,
]).pipe(
debounceTime(this.debouncetime),
map(([reportProperties, reportObjectsProperties, textProperties]) => {
return { reportObjectsProperties, reportProperties, textProperties };
}),
);
Then to store it:
this.changes$
.pipe(
skip(1),
switchMap((properties: ReportPropertiesRequest) => this.save(this.registryId, properties)),
)
.subscribe();
}
Issue is that combineLatest
works only when all streams have the latest value. In my case textPropsChanges$
has no.
How to collect all changes and store them?
My purpose is to collect all changes that happens asynchronously and store them imidiatly in storage.
Possible solution is set start value for all stremas with startWith or to use BehSubject. But in my case I can not use this, because user should make chnages himself.
CodePudding user response:
You can ignore some default token. So here I've used null
as an example:
this.changes$ = combineLatest([
this.reportChanges$,
this.objectPropChanges$,
this.textPropsChanges$.pipe(startWith(null)),
]).pipe(
debounceTime(this.debouncetime),
map(([reportProperties, reportObjectsProperties, textProperties]) =>
textProperties != null ? // Only include if not null
{ reportObjectsProperties, reportProperties, textProperties } :
{ reportObjectsProperties, reportProperties }
),
);
CodePudding user response:
Probably you can have local object changes = {}
. Then listen each stream separately and set chnages to this object:
this.reportChanges$.subscribe((data) => this.changes.report = data;
this.objectPropChanges$.subscribe((data) => this.changes.objectProp = data;