Home > OS >  Emitting events from an observable
Emitting events from an observable

Time:08-03

I'm trying to create an observable that will make a connection with a stream through a websocket then emit those events.

here is a basic example of how I'd like to model it:

const observable = defer(() => connectToWs()).pipe(
   tap(wsClient => {
      wsClient.subscribe('channel')

      wsClient.messageReceived = (msg) => {
         // emit msg here
      }
   }
)

But I'm a bit stuck on how to emit the msg variable after using a creation operator like defer, any help or advice with rxjs would be greatly appreciated.

CodePudding user response:

You can to create an observable inside switchMap for message to be emitted.

const observable = defer(() => connectToWs()).pipe(
   tap(wsClient =>wsClient.subscribe('channel'))
   switchMap(wsClient=>{
      return new Observable(obs=>{
        wsClient.messageReceived = (msg) => {
          obs.next(msg) 
      }
    }
   }

)

CodePudding user response:

Rather than manipulating your client objects within an observable subscription, I suggest encapsulating the client subscribe/unsubscribe logic by writing a function with the signature:

(client: WsClient) => Observable<Message>

(I'm making up the types here, but hopefully you get the idea.)

const observeChannel = (
  client: WsClient,
  channel: string
): Observable<Message> => {
  return new Observable<Message>((observer) => {
    client.subscribe("channel");

    client.messageReceived = (msg: Message) => {
      observer.next(msg);
    };
    return () => {
      // unsubscribe logic
    };
  });
};

const observable = defer(() => connectToWs()).pipe(
  switchMap((client) => observeChannel(client, "channel"))
);
  • Related