Home > database >  RabbitMQ Closes connection after publish
RabbitMQ Closes connection after publish

Time:11-27

I am using RabbitMQ on Golang, and I am finding myself into some trouble I never was before. The connection keeps closing after one single Publish. This is the snippet:


func (k*K) DoWork(){
[...]
go func() {
        for {
            time.Sleep(time.Second * 5)
            k.eventLock.Lock()
            if !k.connected {
                fmt.Printf("Not connected, skip\n")
                k.eventLock.Unlock()
                continue
            }
            k.eventLock.Unlock()
            err = k.outChannel.Publish(k.outExchangeName, "", true, true, amqp.Publishing{
                ContentType: "application/json",
                Body:        content,
            })

            if err != nil {
                fmt.Printf("Error sending status - %v\n", err)
                for {
                    err = k.initOutConnection()
                    if err != nil {
                        fmt.Printf("An error occurred initOut - %v\n", err)
                    } else {
                        fmt.Printf("Restablished connection\n")
                        break
                    }
                }
                continue
            } else {
                fmt.Printf("Sent keep alive\n")
            }
        }
    }()
}

func (k *K) initOutConnection() error {
    var err error
    k.outConnection, err = k.getRabbitConnection()

    if err != nil {
        return err
    }

    k.outChannel, err = k.outConnection.Channel()
    if err != nil {
        return err
    }

    k.outExchangeName = os.Getenv("RABBIT_MQ_OUT_EXCHANGE_NAME")

    err = k.outChannel.ExchangeDeclare(
        k.outExchangeName,
        "fanout",
        true,
        true,
        false,
        true, nil)

    if err != nil {
        return err
    }

    return nil
}

This is the output:

Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection

It is a perfect cycle, one successful send, the connection closes, I establish the connection, then again one success, one failure. But this is to the eyes of the producer application, the consumer did not receive any message.

And if I on purpose close the connection after one publish, it does work:

err = k.outChannel.Publish()
if err == nil {
    k.outChannel.Close()
    k.initOutConnection()
    fmt.Printf("Sent keep alive\n")
}

It produces a consistent output:

Sent keep alive
Sent keep alive
Sent keep alive

But I want to use a single connection for all my publish, does someone know what am I doing wrong?

CodePudding user response:

Which amqp library do you use? Is it "github.com/streadway/amqp"?

I don't think the connection is closed after you send a message, unless you do close the connection in your code / have some wrong implementation. What could happen is that the outgoing channel is not able to send a message because it was put in confirmation mode.

Here's an excerpt from the documentation:

Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.

Doc:

https://pkg.go.dev/github.com/streadway/amqp#Channel.Publish

So usually the publish looks like:

channel := client.NotifyPublish(...)
client.Publish(...)

// wait for the channel

Example from the repository: https://github.com/streadway/amqp/blob/master/_examples/simple-producer/producer.go

CodePudding user response:

after 1.5 days on it, it finally came to my mind to enable debug logging in Docker RabbitMQ without detaching from it with:

docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

And ran the above snippet, sure enough an error appeared:

Error on AMQP connection <0.1254.0> (172.17.0.1:41690 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 1:
 operation basic.publish caused a connection exception not_implemented: "immediate=true"

The issue was the 4th Argument to .Publish() which is immediate, instead of true I should set it to false ... The connection was really there up and working, as soon as I called Publish with immediate=true it got closed the connection and although I did not get an error from Publish the message did not reach its exchange destination.

That simple, a single boolean wasting me countless hours.

  • Related