Home > Net >  Hold subscribe method with custom handler nats golang
Hold subscribe method with custom handler nats golang

Time:12-22

I am writing wrapper on top of nats client in golang, I want to take handler function which can be invoked from consumer once I get the message from nats server. I want to hold custom subscribe method until it receives the message from nats.

Publish:

func (busConfig BusConfig) Publish(service string, data []byte) error {
    pubErr := conn.Publish(service, data)
    if pubErr != nil {
        return pubErr
    }
    return nil
}

Subscribe:

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
    fmt.Println("Subscrbing on : ", subject)

    //wg := sync.WaitGroup{}
    //wg.Add(1)
    subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
        go func() {
            handler(msg.Data)
        }()
        //wg.Done()
    })
    if err != nil {
        fmt.Println("Subscriber error : ", err)
    }
    //wg.Wait()
    defer subscription.Unsubscribe()

}

test case:

func TestLifeCycleEvent(t *testing.T) {
    busClient := GetBusClient()
    busClient.Subscribe(SUBJECT, func(input []byte) {
        fmt.Println("Life cycle event received :", string(input))
    })

    busClient.Publish(SUBJECT, []byte("complete notification"))
}

I am seeing message is published but not subscribed, I tried to hold subscribe method using waitgroup but I think this is not the correct solution.

CodePudding user response:

You don't see the message being delivered because Subscribe is an async method that spawns a goroutine to handle the incoming messages and call the callback.

Straight after calling busClient.Publish() your application exits. It does not wait for anything to happen inside Subscribe().

When you use nats.Subscribe(), you usually have a long-running application that exits in specific conditions (like receiving a shutdown signal). WaitGroup can work here, but probably not for real applications, just for tests.

You should also call Flush() method on NATS connection to ensure all buffered messages have been sent before exiting the program.

If you want a synchronous method, you can use nats.SubscribeSync()

Check out examples here: https://natsbyexample.com/examples/messaging/pub-sub/go

CodePudding user response:

For My understandingm, I think in nats we need to respond the message even if we are not providing the reply address, So it can respond to message.

func (busConfig BusConfig) Subscribe(subject string, handler func(msg []byte)) {
    subscription, err := conn.Subscribe(subject, func(msg *nats.Msg) {
        go func() {
            handler(msg.Data)
            msg.Respond(nil)
        }()
      })
   }
  • Related