Home > OS >  Combine subject and observer in a single subscribe RxJs
Combine subject and observer in a single subscribe RxJs

Time:06-02

I have an Behavior Subject

private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();

constructor() { }

set updatedUsers(users: ActionedUser[]) {
    this.usersSubject.next(users);
}

and an Observable

getCurrentUser(): Observable<WebAuthUser> {
    if (environment.production) {
        return this.http.get<WebAuthUser>(`${this.webAuthURL}/wauth/api/user`, { withCredentials: true });
    }
    const devUser: WebAuthUser = {
        userName: 'local_dev_user',
    };

    return of(devUser);
}

i want to join them into a single subscribe but for some reason when i use forkJoin() and subscribe to it nothing is being emitted

 forkJoin([
        this.saveUsersSubject.usersChange,
        this.api.getCurrentUser(),
    ]);
    this.getUsersDataAndLoggedUserSubscription.subscribe(([ userData, loggedUser]) => {
        
        this.actionedUsers = userData;
        this.currentUser = loggedUser;
    });

i think that it is due to the Subject but what can be the best way to join them and provide value once both are done?

CodePudding user response:

Option 1: reactive method (IMO elegant)

forkJoin would only emit when both observables complete. The quickest way would be to tack in a take(1) to the BehaviorSubject observable. But this assumes you do not need the future emissions from the BehaviorSubject. If you wish to react to each emission of the BehaviorSubject, then you could use combineLatest or zip (note: they aren't synonymous) instead of forkJoin.

import { take } from 'rxjs/operators';

forkJoin([
  this.saveUsersSubject.usersChange.pipe(take(1)),
  this.api.getCurrentUser()
]);

Option 2: synchronous method (IMO inelegant)

As opposed to Subject and ReplaySubject, BehaviourSubject contains a special trait that it "holds" the last value pushed to it. You could access it anytime synchronously using the the value getter or the getValue() method (both essentially serve the same purpose).

private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();

constructor() { }

set updatedUsers(users: ActionedUser[]) {
  this.usersSubject.next(users);
}

public actionedUsers(): ActionedUser[] {
  return this.usersSubject.value;
}
this.api.getCurrentUser().subscribe({
  next: (currentUser: any) => {  
    this.actionedUsers = this.saveUsersSubject.actionedUsers();
    this.currentUser = loggedUser;
  },
  error: (error: any) => {
    // handle errors
  }
});
  • Related