Home > Blockchain >  Golang-Paho MQTT Subscriber keeps disconnecting with error EOF after reinitialization of subscriber
Golang-Paho MQTT Subscriber keeps disconnecting with error EOF after reinitialization of subscriber

Time:11-02

I'm trying to change mqtt client handlers and certificates dynamically, which causing subscriber EOF, when subscriber and publisher is connected

This is what I'm trying to do,

1] I'm initializing subscriber/publisher (using firstPubHandler, firstConnectHandler and default certificates)

2] Sending registration message on server using publisher to get new certificates details

3] Server will response back with certificate details, that response will be handled by firstConnectHandler on topic .../id/Certificate to download certificates.

4] firstPubHandler will handle response by server and reinitialize publisher/subscriber (using messagePubHandler, connectHandler and newly downloaded certificates), connectHandler will listen for all topics /id/

Everything works good, except when I reinitialize subscriber/publisher, subscriber keeps disconnecting with error "EOF"

Am I doing anything wrong here? or is there any better way to accomplish this? Any help is appreciated

-- Main function

var opt Params
var publisher mqtt.Client
var subscriber mqtt.Client

func main() {
    InitializeBroker(firstPubHandler, firstConnectHandler)

    //Ultimately it will trigger message on ".../id/Certificate" topic which will be handled byfirstConnectHandler 
    PublishRegistrationMessage(publisher)

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    done := make(chan os.Signal, 1)
    signal.Notify(done, os.Interrupt, syscall.SIGTERM)
    go func() {
        for {
        }
    }()
    <-done
    <-c
    DisconnectBrocker()
}

-- Handlers

// First handlers
var firstPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    DownloadCertificates(msg.Payload())
    InitializeBroker(messagePubHandler, connectHandler)
}

var firstConnectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
    if token := c.Subscribe(opt.SubClientId "/id/Certificate", 0, firstPubHandler); token.Wait() && token.Error() != nil {
        log.Error(token.Error())
    }
}

// Second handlers
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    ProcessMessage(msg.Payload())
}

var connectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
    if token := c.Subscribe(opt.SubClientId "/id/ ", 0, messagePubHandler); token.Wait() && token.Error() != nil {
        log.Error(token.Error())
    }
}

// Common handler
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    log.Info(err)
}

-- Mqtt broker initialization

func InitializeBroker(lMessageHandler mqtt.MessageHandler, lConnectHandler mqtt.OnConnectHandler) {
    statusPublishTopic := opt.PubClientId/id
    nodeSubscribeTopic := opt.SubClientId/id

    // Build the options for the publish client
    publisherOptions := mqtt.NewClientOptions()
    publisherOptions.AddBroker(opt.Broker)
    publisherOptions.SetClientID(statusPublishTopic)
    publisherOptions.SetDefaultPublishHandler(lMessageHandler)
    publisherOptions.OnConnectionLost = connectLostHandler

    // Build the options for the subscribe client
    subscriberOptions := mqtt.NewClientOptions()
    subscriberOptions.AddBroker(opt.Broker)
    subscriberOptions.SetClientID(nodeSubscribeTopic)
    subscriberOptions.SetDefaultPublishHandler(lMessageHandler)
    subscriberOptions.OnConnectionLost = connectLostHandler
    subscriberOptions.OnConnect = lConnectHandler

    if !opt.NoTLS {
        tlsconfig, err := NewTLSConfig()
        if err != nil {
            log.Fatalf(err)
        }

        subscriberOptions.SetTLSConfig(tlsconfig)
        publisherOptions.SetTLSConfig(tlsconfig)
    }

    publisher = mqtt.NewClient(publisherOptions)
    if token := publisher.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf(token.Error())
    }

    subscriber = mqtt.NewClient(subscriberOptions)
    if token := subscriber.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf(token.Error())
    }
}

func NewTLSConfig() (config *tls.Config, err error) {
    certpool := x509.NewCertPool()
    pemCerts, err := ioutil.ReadFile(rootCert)
    if err != nil {
        return nil, err
    }
    certpool.AppendCertsFromPEM(pemCerts)

    cert, err := tls.LoadX509KeyPair(nodeCertFilePath, pvtKeyFilePath)
    if err != nil {
        return nil, err
    }

    config = &tls.Config{
        RootCAs:      certpool,
        ClientAuth:   tls.NoClientCert,
        ClientCAs:    nil,
        Certificates: []tls.Certificate{cert},
    }
    return config, nil
}

CodePudding user response:

Based on a quick review of your code this is what I believe is happening (as you have not provided all of the code there is a little guesswork involved):

  1. main() calls InitializeBroker which creates two connections to the broker. The default publish handler is set to firstPubHandler and in the OnConnect handler you subscribe to SubClientId "/id/Certificate
  2. When a message is received (firstPubHandler) you grab a certificate from the message and use it to establish a new set of connections to the broker using the same client IDs but different OnConnect/default publish handler.

So after point 2 you actually have two separate sets of connections to the broker (4 connections in total). However MQTT-3.1.4-2 (see the spec) states:

If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client.

So when the second set of connections are established the broker will drop the first set of connections. This is the 'EOF' disconnection you are seeing. The second set of connections will still be up. As you are using the same connectLostHandler for the first and second sets of connections you cannot see which connection is being terminated in the logs.

In summary I believe your code is actually working. However you probably should call c.Disconnect() in firstConnectHandler so that the initial connection is cleanly closed before you establish the second set of connections. You would also need to store the publisher somewhere so you can disconnect that connection at the same time.

Note: I'm struggling to understand why you are doing this. Establishing an initial connection to retrieve a certificate appears to decrease the overall security of your system. The standard apprach would be to give each client a unique certificate and then use ACL's on the broker to apply whatever restrictions are necessary. With many brokers you can use the certificate common-name in ACL's (thus removing the need for a second connection).

  • Related