Home > Net >  Message still in nats limit queue after ack and term sent in Go
Message still in nats limit queue after ack and term sent in Go

Time:09-14

I tried writing a subscriber for a NATS limit queue:

sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
    return err
}

msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
    if errors.Is(err, nats.ErrSlowConsumer) {
        log.Printf("Slow consumer error returned. Waiting for reset...")
        time.Sleep(50 * time.Millisecond)
        continue
    } else {
        return err
    }
}

msg.InProgress()

var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
    msg.Term()
    return err
}

actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
    msg.Nak()
    continue
}

callback, err := handler(&message)
if err == nil {
    msg.Ack()
    msg.Term()
} else {
    msg.Nak()
    return err
}

callback(ctx)

The goal of this code is consume any message on a number of subjects and call a callback function associated with the subject. This code works but the issue I'm running into is that I'd like the message to be deleted after the call to handler if that function doesn't return an error. I thought that's what msg.Term was doing but I still see all the messages in the queue.

I had originally designed this around a work queue but I wanted it to work with multiple subscribers so I had to redesign it. Is there any way to make this work?

CodePudding user response:

Based on the code provided, I assume that you are not providing stream and consumer info when creating a subscription with the JetStream library.

In the documentation for the SubscribeSync method, it says that when stream and consumer information is not provided, the library will create an ephemeral consumer and the name of the consumer is picked by the server. It also attempts to figure out which stream the subscription is for.

Here is what I believe happens in your code:

  • When you call the SubscribeSync method, an ephemeral consumer is created, with your provided topic.
  • When msg.Ack and msg.Term are called, you do acknowledge the message, but only for that current consumer.
  • The next time you call the SubscribeSync method, a new ephemeral consumer is created, containing the message that you already deleted on another consumer. Which is how the Jetstream concepts of streams, consumers, and subscriptions work by design.

Based on what you want to accomplish, here are some suggestions:

  • Use the plain NATS Core library to work with either a pub/sub or a queue. Don't use JetStream. The NATS Core library works with topics directly, whereas the Jetstream library creates additional things (streams and consumers) under the hood if the information is not provided.
  • Use JetStream but create a stream and a durable consumer yourself, either through code or directly on the NATS server. This way, with a stream and a consumer already defined, you should be able to make it work as intended.
  • Related