I'm trying to create an observable that will make a connection with a stream through a websocket then emit those events.
here is a basic example of how I'd like to model it:
const observable = defer(() => connectToWs()).pipe(
tap(wsClient => {
wsClient.subscribe('channel')
wsClient.messageReceived = (msg) => {
// emit msg here
}
}
)
But I'm a bit stuck on how to emit the msg
variable after using a creation operator like defer
, any help or advice with rxjs would be greatly appreciated.
CodePudding user response:
You can to create an observable
inside switchMap
for message to be emitted.
const observable = defer(() => connectToWs()).pipe(
tap(wsClient =>wsClient.subscribe('channel'))
switchMap(wsClient=>{
return new Observable(obs=>{
wsClient.messageReceived = (msg) => {
obs.next(msg)
}
}
}
)
CodePudding user response:
Rather than manipulating your client objects within an observable subscription, I suggest encapsulating the client subscribe/unsubscribe logic by writing a function with the signature:
(client: WsClient) => Observable<Message>
(I'm making up the types here, but hopefully you get the idea.)
const observeChannel = (
client: WsClient,
channel: string
): Observable<Message> => {
return new Observable<Message>((observer) => {
client.subscribe("channel");
client.messageReceived = (msg: Message) => {
observer.next(msg);
};
return () => {
// unsubscribe logic
};
});
};
const observable = defer(() => connectToWs()).pipe(
switchMap((client) => observeChannel(client, "channel"))
);