Home > Net >  RxJs observable data processing in Angular service
RxJs observable data processing in Angular service

Time:02-04

I am having some issues with working with the data from RxJs and other data in a service in Angular.

I am working with Angular 15 and standalone components, but also trying to follow some best practices for working with data.

My component is to show data from various sources. The issue I have is combining all of that data into a response from my service. I believe my issue is simply in not fully understanding the RxJs pipe, subscribe, map functionality.

In my example service, I can get the different parts working correctly, but I cannot combine the results into what I want.

I will omit a lot of the service that is not relevant to this task. Basically, my code is fetching data from a web service back end using Http, and getting that response as an observable of data in the database.

My application then has a local copy of data, in an array, that may have some of the same records, different records, etc. I am trying to combine the results into an internal BehaviorSubject that would then be used by my component to show the data.

    private ApiEndPoint: string = '/timeentry';
    private Data$_: BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>> = new BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>>(new Array<LIB_TimeEntry_ResponseDTO>());
    private MergedData$_: BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>> = new BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>>(new Array<LIB_TimeEntry_ResponseDTO>());

    constructor(
        private Http_: HttpClient,
    ) {}

    // component uses this with async pipe and can loop through and show data
    get Entries$(): Observable<Array<LIB_TimeEntry_ResponseDTO>> {
        return this.Data$_.asObservable();
    }

    // component shows this value using async pipe, which is correct, so the Entries$ does have the data from the backend
    get Count$(): Observable<number> {
        return this.Entries$.pipe(
            map( (Entries: Array<LIB_TimeEntry_ResponseDTO>) => {
                return Entries.length;
            })
        );
    }

    get MergedData$(): Observable<Array<LIB_TimeEntry_ResponseDTO>> {
        return this.MergedData$_.asObservable();
    }
    
    // component calls this to load intiial data when starting
    public Initialize(BaseURL: string, UserId: string, EntryDate: number): void {
        const AccessEndPoint: string = `user/${UserId}/${EntryDate}`;
        this.Subscription$_ = this.Http_.get<Array<LIB_TimeEntry_ResponseDTO>>(`${BaseURL}${this.ApiEndPoint}/${AccessEndPoint}`, { observe: 'response' }).pipe(
            catchError(() => {
                console.log('Error received in TimeSheet Service');
                return throwError(() => {
                    console.log('Error rethrown by TimeSheet Service');
                    return new Error(`Unable to load Time Sheet data`);
                })
            })
        ).subscribe(data => {
            this.Data$_.next(data.body as Array<LIB_TimeEntry_ResponseDTO>); // Loads into internal BehaviorSubject
        });
    }

    // Getting the data from the backend seems to work.  Now I am trying to loop through this data, and combine it with an array
    // of data I have into a new BehaviorSubject.  In my testing I call this as a public function from the component but it doesn't work
    public MergeDatabaseDataWithUploadedData_(LocalData: Array<LIB_TimeEntry_ResponseDTO>): void {
        const DatabaseData: Array<LIB_TimeEntry_ResponseDTO> = new Array<LIB_TimeEntry_ResponseDTO>(); // Create the array to build the data into
        
        this.Entries$.pipe( // Follow Count$() call to get the Array (Data) to work with
        map ( Data => {
            Data.forEach( (OneEntry: LIB_TimeEntry_ResponseDTO) => { // Loop through each record and add it to my Array for subsequent use.
                DatabaseData.push(OneEntry); // Data does not appear into my array. Trying things, as I did want this to be a Map, but array illustrates the point.
                // const KeyValue = this.BuildUniqueKey(OneEntry); 
                // Map.set(KeyValue, OneEntry);
            });
        })
        // ).subscribe(Data => { // HAd tried this thinking it was the problem?
        //  return Data;
        // })
        ;
        console.log(DatabaseData); // No Data???

        // At this point, I will process through my LocalData updating information
        LocalData.forEach( (OneRecord: LIB_TimeEntry_ResponseDTO) => {
            ... // Determine if record exists
            const LocalKey = this.BuildUniqueKey(OneRecord);
            if (Map.has(LocalKey)) {
                ... // Make changes to OneRecord
                Map.set(LocalKey, OneRecord); // Update Map entry from DB with Local Results
            } else {
                ... // Make OneRecord indicate this is a new record
                Map.set(LocalKey, OneRecord); 
            }
            ... // Additional logic to indicate records to be deleted
        });
        const MergedData: Array<LIB_TimeEntry_ResponseDTO> = new Array<LIB_TimeEntry_ResponseDTO>(); // Make array to process Map into
        Map.forEach( (OneMapRecord) => {
            MergedData.push(OneMapRecord);
        });
        this.MergedData$_.next(MergedData); // This would be the Map data that has been built
        
    }

I have more code trying to merge data etc., but as I cannot simply convert the data into an internal array, the remainder of the code will obviously not work until this first issue is resolved.

Once I can populate the Array(Map in the future) from the observable, I will then loop through my local array data, and update the records in the Array from the observable. This updated

CodePudding user response:

In reference to this part of your code:

01    public MergeDatabaseDataWithUploadedData_(LocalData: Array<LIB_TimeEntry_ResponseDTO>): void {
02        const DatabaseData: Array<LIB_TimeEntry_ResponseDTO> = new Array<LIB_TimeEntry_ResponseDTO>(); // Create the array to build the data into
03        
04        this.Entries$.pipe( // Follow Count$() call to get the Array (Data) to work with
05        map ( Data => {
06            Data.forEach( (OneEntry: LIB_TimeEntry_ResponseDTO) => { // Loop through each record and add it to my Array for subsequent use.
07                DatabaseData.push(OneEntry); // Data does not appear into my array. Trying things, as I did want this to be a Map, but array illustrates the point.
08                // const KeyValue = this.BuildUniqueKey(OneEntry); 
09                // Map.set(KeyValue, OneEntry);
10            });
11        })
12        // ).subscribe(Data => { // HAd tried this thinking it was the problem?
13        //  return Data;
14        // });
15        console.log(DatabaseData); // No Data???
...

There are two reasons the console.log() on line 15 shows an empty array:

  • You are not subscribing. If you don't subscribe, the logic within the .pipe() is never executed.
  • Even if you do subscribe, DatabaseData is still empty because the code is executed asynchronously
    • line 04 simply defines an observable
    • line 12 creates a subscription
    • line 15 is executed, but the logic within the pipe hasn't happened yet, therefore your array is still empty

Hopefully that makes sense.


You have a lot of code above, so I'm going to use a more simplified scenario to show you a general example of how to compose observable streams from one another. This typically involves defining a new observable that begins with another observable source, and pipes its emissions through some operators to produce the desired output. (ex: newObs$ = obs$.pipe(...);). You won't typically create a method that returns an observable, you just define the observable.

RxJS provides a plethora of operators and static functions to help us create observables with well-defined behavior. It's important to think about the "what" and the "when" aspect of the observable you are defining.

  • What do you want it to emit?
  • When do you want it to emit?

In the case of wanting to merge the emissions from an observable source with an array of values:

  • What: the latest emission from the observable combined with the array
  • When: whenever the source observable emits
private data$ = new BehaviorSubject<Data[]>([]);
private localData: Data[];

mergedData$ = this.data$.pipe(
    map(data => data.concat(this.localData))
);

The above mergedData$ observable will emit the combined array whenever data$ emits a value.


I'm not sure about your particular use case though. Where does your LocalData come from? Is it the result of some other observable? If so, the approach would be different, because in that case we'd want our mergedData$ to emit whenever either of our two sources emit.

RxJS provides a static function that is perfect for this scenario called combineLatest. combineLatest will take multiple sources and emit the latest value from each source whenever any of them emit. Here's an example:

private data$ = new BehaviorSubject<Data[]>([]);
private otherData$ = new BehaviorSubject<Data[]>([]);

mergedData$ = combineLatest([this.data$, this.otherData]).pipe(
    map(([data, otherData]) => data.concat(otherData))
);

Here, mergedData$ will emit:

  • What - the concatenation of the latest emission of data$ and otherData$
  • When - either data$ or otherData$ emit a value

If your merge logic was more complicated, you'd simply need to modify the result of the map operator:

mergedData$ = combineLatest([this.data$, this.data2$, this.data3$]).pipe(
    map(([data, data2, data3]) => {
        // return something here...
    })
);
  • Related