I am using RabbitMQ on Golang, and I am finding myself into some trouble I never was before. The connection keeps closing after one single Publish
. This is the snippet:
func (k*K) DoWork(){
[...]
go func() {
for {
time.Sleep(time.Second * 5)
k.eventLock.Lock()
if !k.connected {
fmt.Printf("Not connected, skip\n")
k.eventLock.Unlock()
continue
}
k.eventLock.Unlock()
err = k.outChannel.Publish(k.outExchangeName, "", true, true, amqp.Publishing{
ContentType: "application/json",
Body: content,
})
if err != nil {
fmt.Printf("Error sending status - %v\n", err)
for {
err = k.initOutConnection()
if err != nil {
fmt.Printf("An error occurred initOut - %v\n", err)
} else {
fmt.Printf("Restablished connection\n")
break
}
}
continue
} else {
fmt.Printf("Sent keep alive\n")
}
}
}()
}
func (k *K) initOutConnection() error {
var err error
k.outConnection, err = k.getRabbitConnection()
if err != nil {
return err
}
k.outChannel, err = k.outConnection.Channel()
if err != nil {
return err
}
k.outExchangeName = os.Getenv("RABBIT_MQ_OUT_EXCHANGE_NAME")
err = k.outChannel.ExchangeDeclare(
k.outExchangeName,
"fanout",
true,
true,
false,
true, nil)
if err != nil {
return err
}
return nil
}
This is the output:
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
Sent keep alive
Error sending status - Exception (504) Reason: "channel/connection is not open"
Restablished connection
It is a perfect cycle, one successful send, the connection closes, I establish the connection, then again one success, one failure. But this is to the eyes of the producer application, the consumer did not receive any message.
And if I on purpose close the connection after one publish, it does work:
err = k.outChannel.Publish()
if err == nil {
k.outChannel.Close()
k.initOutConnection()
fmt.Printf("Sent keep alive\n")
}
It produces a consistent output:
Sent keep alive
Sent keep alive
Sent keep alive
But I want to use a single connection for all my publish
, does someone know what am I doing wrong?
CodePudding user response:
Which amqp library do you use? Is it "github.com/streadway/amqp"?
I don't think the connection is closed after you send a message, unless you do close the connection in your code / have some wrong implementation. What could happen is that the outgoing channel is not able to send a message because it was put in confirmation mode.
Here's an excerpt from the documentation:
Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.
Doc:
https://pkg.go.dev/github.com/streadway/amqp#Channel.Publish
So usually the publish looks like:
channel := client.NotifyPublish(...)
client.Publish(...)
// wait for the channel
Example from the repository: https://github.com/streadway/amqp/blob/master/_examples/simple-producer/producer.go
CodePudding user response:
after 1.5 days on it, it finally came to my mind to enable debug logging in Docker RabbitMQ without detaching from it with:
docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
And ran the above snippet, sure enough an error appeared:
Error on AMQP connection <0.1254.0> (172.17.0.1:41690 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 1:
operation basic.publish caused a connection exception not_implemented: "immediate=true"
The issue was the 4th Argument to .Publish()
which is immediate
, instead of true I should set it to false
...
The connection was really there up and working, as soon as I called Publish with immediate=true
it got closed the connection and although I did not get an error from Publish
the message did not reach its exchange destination.
That simple, a single boolean wasting me countless hours.