Home > database >  Get message twice with MQTT with C# uPLibrary.Networking.M2Mqtt
Get message twice with MQTT with C# uPLibrary.Networking.M2Mqtt

Time:10-24

My problem is, it all works when transmitter and Subscribe are connected. All messages are transmitted correctly.

But if I disconnect the connection, so the Subscribe is no longer connected and then reconnect.

I get all the messages in between but the last one Double.

The double received is wrong. Why is it like that ? And how can I solve this?

Publisher

public class Programm
{

    static MqttClient mqttClient;
    static async Task Main(string[] args)
    {
        var locahlost = true;
        var clientName = "Sender 1";

        Console.WriteLine($"{clientName} Startet");

        var servr = locahlost ? "localhost" : "test.mosquitto.org";
        mqttClient = new MqttClient(servr);
        mqttClient.Connect(clientName, null, null, false, 60);

        Task.Run(() =>
        {
            if (mqttClient != null && mqttClient.IsConnected)
            {
                for (int i = 0; i < 100; i  )
                {
                    var Message = $"{clientName} ->Test {i}";
                    mqttClient.Publish("Application1/NEW_Message", Encoding.UTF8.GetBytes($"{Message}"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, true);
                    Console.WriteLine(Message);
                    Thread.Sleep(i * 1000);
                }
            }
        });

        Console.WriteLine($"{clientName} End");
    }
}

subscriber

public class Programm
{
    static MqttClient mqttClient;


    static async Task Main(string[] args)
    {
        var clientName = "subscriber 1";
        var locahlost = true;

        Console.WriteLine($"Start of {clientName}");
      

        Task.Run(() =>
        {
            var servr = locahlost ? "localhost" : "test.mosquitto.org";
            mqttClient = new MqttClient(servr);
         
            mqttClient.MqttMsgPublishReceived  = MqttClient_MqttMsgPublishReceived;
            mqttClient.Subscribe(new string[] { "Application1/NEW_Message" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
            mqttClient.Connect(clientName, null, null, false, 60);
            //mqttClient.Connect(clientName);
        });
        
        //  client.UseConnecedHandler(e=> {Console.WriteLine("Verbunden") });
        Console.ReadLine();
        Console.WriteLine($"end of  {clientName}");

        Console.ReadLine();


    }
    private static void MqttClient_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
    {
       
        var message = Encoding.UTF8.GetString(e.Message);
        Console.WriteLine(message);
    }


}

server

static async Task Main(string[] args)
{
    Console.WriteLine("Server");
    MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
                                 // set endpoint to localhost
                                 .WithDefaultEndpoint()
                                 // port used will be 707
                                 .WithDefaultEndpointPort(1883)

                                 // handler for new messages
                                 .WithConnectionValidator(OnNewConnection)

                                 .WithApplicationMessageInterceptor(OnNewMessage)
                                 .WithClientMessageQueueInterceptor(OnOut)
                                 .WithDefaultCommunicationTimeout(TimeSpan.FromMinutes(5))
                                 .WithMaxPendingMessagesPerClient(10)
                                 .WithPersistentSessions()
                                 .WithStorage(storage)
                                 ;
    // creates a new mqtt server     
    IMqttServer mqttServer = new MqttFactory().CreateMqttServer();



    // start the server with options  
    mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();

    // keep application running until user press a key
    Console.ReadLine();
}

private static void OnOut(MqttClientMessageQueueInterceptorContext context)
{
    var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);
    Out_MessageCounter  ;
    var messageValue = Encoding.UTF8.GetString(context?.ApplicationMessage?.Payload);
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
    Console.WriteLine("Messsage OUT :->");
    Console.WriteLine($"MessageId: {Out_MessageCounter} - TimeStamp: {DateTime.Now} -- Message: ClientId = {context.ReceiverClientId}, Topic = {context.ApplicationMessage?.Topic}, Payload = {payload}, QoS = {context.ApplicationMessage?.QualityOfServiceLevel}, Retain-Flag = {context.ApplicationMessage?.Retain} Message {messageValue}");
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");


}

public static void OnNewConnection(MqttConnectionValidatorContext context)
{
    Console.WriteLine(
            $"New connection: ClientId = {context.ClientId}, Endpoint = {context.Endpoint}");
}

public static void OnNewMessage(MqttApplicationMessageInterceptorContext context)
{
    var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);

    In_MessageCounter  ;
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
    Console.WriteLine("Messsage IN :->");
    Console.WriteLine($"MessageId: {In_MessageCounter} - TimeStamp: {DateTime.Now} -- Message: ClientId = {context.ClientId}, Topic = {context.ApplicationMessage?.Topic}, Payload = {payload}, QoS = {context.ApplicationMessage?.QualityOfServiceLevel}, Retain-Flag = {context.ApplicationMessage?.Retain} Message {Encoding.UTF8.GetString(context?.ApplicationMessage?.Payload)}");
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
    Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");


}

CodePudding user response:

From the source:

/// Publish a message asynchronously
/// <param name="topic">Message topic</param>  
/// <param name="message">Message data (payload)</param> 
/// <param name="qosLevel">QoS Level</param> 
/// <param name="retain">Retain flag</param>  
/// <returns>Message Id related to PUBLISH message</returns>  
public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)

You probably want to set the last value to false

mqttClient.Publish("Application1/NEW_Message", Encoding.UTF8.GetBytes($"{Message}"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);

For more details about retained messages see here:

https://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages/

  • Related