Home > other >  RabbitMQ Queue Length is always 0
RabbitMQ Queue Length is always 0

Time:11-28

I was writing an application and I had this issue, looking the code over and over, nothing seems to be wrong, tested with the below basic snippet and the issue is reproducible .... RabbitMQ is saying the queue is always empty when it is not.

The below Golang snippet shows a producer sending messages more often than the consumer consuming them. The consumer is always active but sleeping longer to make the queue have messages in its backlog. Result? The consumer fetches messages each time it tries however the API is always saying there are no messages -> message count is 0.

package main

import (
    "encoding/json"
    "fmt"
    "github.com/streadway/amqp"
    "io/ioutil"
    "net/http"
    "testing"
    "time"
)
func main() {

    username := "guest"
    password := "guest"
    scheme := "amqp"
    rabbitMqHost := "localhost"
    port := "5672"

    connectionString := fmt.Sprintf("%s://%s:%s@%s:%s/", scheme, username, password, rabbitMqHost, port)

    conn, err := amqp.Dial(connectionString)

    if err != nil {
        panic(err)
    }

    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }

    exchangeName := "my-exchange"
    // Declare exchange
    err = ch.ExchangeDeclare(
        exchangeName, // name
        "fanout",     // type
        true,         // durable
        true,         // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )

    if err != nil {
        panic(err)
    }

    // Create first Queue
    queueName := "my-queue"
    q, err := ch.QueueDeclare(
        queueName, // name
        true,      // durable
        true,      // delete when unsused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )

    if err != nil {
        panic(err)
    }

    // Bind Exchange to Queue
    err = ch.QueueBind(
        q.Name,       // queue name
        "",           // routing key
        exchangeName, // exchange
        false,
        nil,
    )

    // Listen
    eventQueue, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

    if err != nil {
        panic(err)
    }

    go func() {
        for a := range eventQueue {
            fmt.Printf("Received Event %s\n", string(a.Body))
            time.Sleep(time.Second * 4)
        }
    }()

    go func() {
        count := 0
        for {
            err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
                ContentType: "application/json",
                Body:        []byte(fmt.Sprintf("Message %d", count)),
            })

            fmt.Printf("Sent Message %d\n", count)
            count  
            if err != nil {
                panic(err)
            }
            time.Sleep(time.Second * 2)
        }
    }()

    for {
        httpRes, err := http.Get("http://guest:guest@localhost:15672/api/queues///my-queue")
        if err != nil {
            panic(err)
        }

        var resJson map[string]interface{}
        content, err := ioutil.ReadAll(httpRes.Body)
        if err != nil {
            panic(err)
        }
        httpRes.Body.Close()
        err = json.Unmarshal(content, &resJson)

        if err != nil {
            panic(err)
        }

        q2, err := ch.QueueDeclarePassive(
            queueName, // name
            true,      // durable
            true,      // delete when unsused
            false,     // exclusive
            false,     // no-wait
            nil,
        )
        fmt.Printf("Queue Len: %f - %d\n", resJson["messages"], q2.Messages)
        time.Sleep(time.Second)
    }

}

You can test with the following RabbitMQ Server:

docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Output:

Sent Message 0
Received Event Message 0
Queue Len: %!f(<nil>) - 0
Queue Len: %!f(<nil>) - 0
Sent Message 1
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 1
Sent Message 2
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 3
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 2
Sent Message 4
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 5
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 3
Sent Message 6
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 7
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 4
Sent Message 8
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Sent Message 9
Queue Len: 0.000000 - 0
Queue Len: 0.000000 - 0
Received Event Message 5
Sent Message 10
....
....

Not a single time the Que Len has said it is not 0. In my application when I put a bunch of messages I may catch it saying X, but quickly it becomes 0, I thought I had a hidden consumer in the app, but no, the API is giving some results that either are not accurate, or I should be looking somewhere else to get the length.

Update

The above only happens when there is a consumer, if the queue does not have a consumer, it works as expected, just comment the .Consume code:

/*
eventQueue, err := ch.Consume(
...
go func(){
for a := range eventQueue {
..
}()
*/

And now it "improves", but first, it is not what I am looking for, second, it is still strange output =S:

Sent Message 0
Queue Len: 0.000000 - 1
Queue Len: 0.000000 - 1
Sent Message 1
Queue Len: 0.000000 - 2
Queue Len: 0.000000 - 2
Sent Message 2
Queue Len: 0.000000 - 3
Queue Len: 1.000000 - 3
Sent Message 3
Queue Len: 1.000000 - 4
Queue Len: 1.000000 - 4
Sent Message 4
Queue Len: 1.000000 - 5
Queue Len: 1.000000 - 5
Sent Message 5
Queue Len: 4.000000 - 6
Queue Len: 4.000000 - 6
Sent Message 6
Queue Len: 4.000000 - 7
Queue Len: 4.000000 - 7
Sent Message 7
Queue Len: 4.000000 - 8
Queue Len: 6.000000 - 8
Sent Message 8
Queue Len: 6.000000 - 9
Queue Len: 6.000000 - 9
Sent Message 9
Queue Len: 6.000000 - 10
Queue Len: 6.000000 - 10
Sent Message 10
Queue Len: 9.000000 - 11
Queue Len: 9.000000 - 11

CodePudding user response:

The field q2.Messages is unreliable, it is the count of messages not awaiting acknowledgment, i.e. messages already ACK'ed.

Your consumer is declared with autoAck = true — i.e. noAck —, which means that no acknowledgements are expected, which means that there are zero messages already ACK'ed.

When you comment out the consumer, the number of acknowledged messages likely depends on the publisher buffer.

Getting a precise number of messages programmatically on a given queue with AMQP 0.9.1 is basically not possible. You may use the message_stats field in the management API instead:

http://localhost:15672/api/queues/vhost/queue_name

CodePudding user response:

The accepted solution will be blackgreen's. The proof is the below replacement, just replace the consumer and publisher code in the question section by:

// Listen
    eventQueue, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack <-- Difference
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

    if err != nil {
        panic(err)
    }

    go func() {

        for a := range eventQueue {
            err = ch.Ack(a.DeliveryTag, false) // <-- Difference
            if err != nil {
                panic(err)
            }
            fmt.Printf("Received Event %s\n", string(a.Body))
            time.Sleep(time.Second * 4)
        }
    }()

    go func() {
        count := 0
        for {
            err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{
                ContentType: "application/json",
                Body:        []byte(fmt.Sprintf("Message %d", count)),
            })

            fmt.Printf("Sent Message %d\n", count)
            count  
            if err != nil {
                panic(err)
            }
            if count >= 20 { // <-- Difference
                break
            }
            time.Sleep(time.Second * 2)
        }
    }()

Output:

.... The increase in the queue length
Sent Message 13
Queue Len: 8.000000 - 0
Queue Len: 8.000000 - 0
Received Event Message 4
Sent Message 14
Queue Len: 8.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 15
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 5
Sent Message 16
Queue Len: 9.000000 - 0
Queue Len: 9.000000 - 0
Sent Message 17
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 6
Sent Message 18
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Sent Message 19
Queue Len: 11.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 7
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 8
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Queue Len: 12.000000 - 0
Received Event Message 9
Queue Len: 12.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Received Event Message 10
Queue Len: 11.000000 - 0
Queue Len: 11.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Received Event Message 11
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 10.000000 - 0
Queue Len: 9.000000 - 0
Received Event Message 12
....
As publisher exits it decreases, the consumer catches up and message len decreases:
Received Event Message 16
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 5.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 17
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Queue Len: 4.000000 - 0
Received Event Message 18
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Queue Len: 2.000000 - 0
Received Event Message 19
Queue Len: 2.000000 - 0
Queue Len: 1.000000 - 0

  • Related