I have a redis pubsub connection in my go websocket app, so whenever a client connects and subscribes to a channel, it listens and sends message.
However, say Client 1 is subscribed to channel X
, the pubsub starts listening and receiving messages from it.
Now, Client 1 also subscribes to channel Y
, so the server should also listen to messages from that channel, however it stops listening to X
and only to Y
.
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
log.Printf("Received message from %s", v.Channel)
subscriptions := ps.GetSubscriptions(v.Channel, nil)
for _, sub := range subscriptions {
if v.Channel == types.TaskResults {
go sendTaskResultMessage(v.Data, sub)
} else if v.Channel == types.TaskCount {
go sendTaskCountMessage(v.Data, sub)
}
}
case redis.Subscription:
log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Println("Error pub/sub, delivery stopped")
return
}
Here's an example log output
go-1 | New Client is connected, total: 1
go-1 | 2022/02/16 17:36:03 signature is invalid
go-1 | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | 2022/02/16 17:36:06 Received message from task_count
go-1 | New Client is connected, total: 2
go-1 | 2022/02/16 17:36:14 signature is invalid
go-1 | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1
go-1 | 2022/02/16 17:36:16 Received message from task_count
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:16 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:21 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:26 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
go-1 | 2022/02/16 17:36:31 Received message from task_results
Any ideas what's going on?
Edited as per comment:
type PubSub struct {
Clients []Client
Subscriptions []Subscription
}
type Client struct {
Id string
Connection *websocket.Conn
}
type Message struct {
Action string `json:"action"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
Token string `json:"token"`
}
type Subscription struct {
Topic string
Client *Client
UserId string
}
func (ps *PubSub) GetSubscriptions(topic string, client *Client) []Subscription {
var subscriptionList []Subscription
for _, subscription := range ps.Subscriptions {
if client != nil {
if subscription.Client.Id == client.Id && subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
} else {
if subscription.Topic == topic {
subscriptionList = append(subscriptionList, subscription)
}
}
}
return subscriptionList
}
Here's my websocket handler
func websocketHandler(w http.ResponseWriter, r *http.Request) {
gRedisConn, err := gRedisConn()
if err != nil {
log.Panic(err)
}
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := pubsub.Client{
Id: autoId(),
Connection: conn,
}
// add this client into the list
ps.AddClient(client)
fmt.Println("New Client is connected, total: ", len(ps.Clients))
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println("Something went wrong", err)
ps.RemoveClient(client)
log.Println("total clients and subscriptions ", len(ps.Clients), len(ps.Subscriptions))
return
}
go listenToMessages()
ps.HandleReceiveMessage(client, messageType, p, gPubSubConn)
}
}
func (ps *PubSub) HandleReceiveMessage(client Client, messageType int, payload []byte, gPubSubConn *redis.PubSubConn) *PubSub {
m := Message{}
err := json.Unmarshal(payload, &m)
if err != nil {
fmt.Println("This is not correct message payload")
return ps
}
switch m.Action {
case PUBLISH:
ps.Publish(m.Topic, m.Message, nil)
case SUBSCRIBE:
ps.Subscribe(&client, m.Topic, gPubSubConn, m.Token)
case UNSUBSCRIBE:
fmt.Println("Client want to unsubscribe the topic", m.Topic, client.Id)
default:
break
}
return ps
}
func (ps *PubSub) Subscribe(client *Client, topic string, gPubSubConn *redis.PubSubConn, token string) *PubSub {
clientSubs := ps.GetSubscriptions(topic, client)
if len(clientSubs) > 0 {
return ps
}
userId := utils.GetUser(token)
newSubscription := Subscription{
Topic: topic,
Client: client,
UserId: userId,
}
ps.Subscriptions = append(ps.Subscriptions, newSubscription)
if err := gPubSubConn.Subscribe(topic); err != nil {
log.Panic(err)
}
return ps
}
CodePudding user response:
The immediate issue is caused by this line in websocketHandler
:
gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}
This line replaces the current pubsub connection with a new connection. The new connection does not have any subscriptions. The previous connection is leaked.
Create the pubsub connection once at application startup.
The application has at least one data race. Run the application with the race detector and fix the reported problems.