Home > Blockchain >  Merge multiple observables that are dependent on the first
Merge multiple observables that are dependent on the first

Time:08-03

I have 3 observables, the first emits a value that is required for the other 2 to run. An example of how it currently looks:

const observable_one = defer(() => websocketConnect('url'))

const obsv = observable_one.pipe(
   tap((client: wsClient) => {
      concat(
         defer(() => authenticate(client)),
         defer(() => subscribe(client, 'channels'))
      )
   }),
   mergeMap((client: wsClient) => {
      // web socket messaging logic etc...
   })
)

However this current implementation does not subscribe to those observables inside the concat. These two inner observables are side affecting and return void they just need to run prior to receiving messages in the mergeMap, hence the use of tap - however they do require access to the client. Is there a way I can achieve this? Any help or advice on this would be greatly appreciated.

CodePudding user response:

You can replace tap with mergeMap then return the inner observable which will be executed. Then pass downstream the client object with mapTo

const obsv = observable_one.pipe(
   mergeMap((client: wsClient) => 
      concat(
         defer(() => authenticate(client)),
         defer(() => subscribe(client, 'channels'))
      ).pipe(mapTo(client))
   ),
   mergeMap((client: wsClient) => {
      // web socket messaging logic etc...
   })
)

CodePudding user response:

The "bug" here is you don't return anything in a tap() operator, it is not a bug that concat() is not subscribed in tap. Because tap is just side effect, you "can not" subscribe in tap. You can use concatMap(), return the concat() and it will get subscribed (because that how higher order mapping works, it will subscribed to inner observable).

const observable_one = defer(() => websocketConnect('url'))

const obsv = observable_one.pipe(
   concatMap((client: wsClient) => {
      return concat(
         defer(() => authenticate(client)),
         defer(() => subscribe(client, 'channels'))
      )
   }),
   tap((value_from_authenticate_OR_subscribe) => {
      // web socket messaging logic etc...
   })
)

Althought looking at your code, you presumably do not want to use concat, because you want to use both value from authenticate and subscribe right? Go look into combineLatest.

  • Related