My problem is, it all works when transmitter and Subscribe are connected. All messages are transmitted correctly.
But if I disconnect the connection, so the Subscribe is no longer connected and then reconnect.
I get all the messages in between but the last one Double.
The double received is wrong. Why is it like that ? And how can I solve this?
Publisher
public class Programm
{
static MqttClient mqttClient;
static async Task Main(string[] args)
{
var locahlost = true;
var clientName = "Sender 1";
Console.WriteLine($"{clientName} Startet");
var servr = locahlost ? "localhost" : "test.mosquitto.org";
mqttClient = new MqttClient(servr);
mqttClient.Connect(clientName, null, null, false, 60);
Task.Run(() =>
{
if (mqttClient != null && mqttClient.IsConnected)
{
for (int i = 0; i < 100; i )
{
var Message = $"{clientName} ->Test {i}";
mqttClient.Publish("Application1/NEW_Message", Encoding.UTF8.GetBytes($"{Message}"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, true);
Console.WriteLine(Message);
Thread.Sleep(i * 1000);
}
}
});
Console.WriteLine($"{clientName} End");
}
}
subscriber
public class Programm
{
static MqttClient mqttClient;
static async Task Main(string[] args)
{
var clientName = "subscriber 1";
var locahlost = true;
Console.WriteLine($"Start of {clientName}");
Task.Run(() =>
{
var servr = locahlost ? "localhost" : "test.mosquitto.org";
mqttClient = new MqttClient(servr);
mqttClient.MqttMsgPublishReceived = MqttClient_MqttMsgPublishReceived;
mqttClient.Subscribe(new string[] { "Application1/NEW_Message" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
mqttClient.Connect(clientName, null, null, false, 60);
//mqttClient.Connect(clientName);
});
// client.UseConnecedHandler(e=> {Console.WriteLine("Verbunden") });
Console.ReadLine();
Console.WriteLine($"end of {clientName}");
Console.ReadLine();
}
private static void MqttClient_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
{
var message = Encoding.UTF8.GetString(e.Message);
Console.WriteLine(message);
}
}
server
static async Task Main(string[] args)
{
Console.WriteLine("Server");
MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
// set endpoint to localhost
.WithDefaultEndpoint()
// port used will be 707
.WithDefaultEndpointPort(1883)
// handler for new messages
.WithConnectionValidator(OnNewConnection)
.WithApplicationMessageInterceptor(OnNewMessage)
.WithClientMessageQueueInterceptor(OnOut)
.WithDefaultCommunicationTimeout(TimeSpan.FromMinutes(5))
.WithMaxPendingMessagesPerClient(10)
.WithPersistentSessions()
.WithStorage(storage)
;
// creates a new mqtt server
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
// start the server with options
mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();
// keep application running until user press a key
Console.ReadLine();
}
private static void OnOut(MqttClientMessageQueueInterceptorContext context)
{
var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);
Out_MessageCounter ;
var messageValue = Encoding.UTF8.GetString(context?.ApplicationMessage?.Payload);
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
Console.WriteLine("Messsage OUT :->");
Console.WriteLine($"MessageId: {Out_MessageCounter} - TimeStamp: {DateTime.Now} -- Message: ClientId = {context.ReceiverClientId}, Topic = {context.ApplicationMessage?.Topic}, Payload = {payload}, QoS = {context.ApplicationMessage?.QualityOfServiceLevel}, Retain-Flag = {context.ApplicationMessage?.Retain} Message {messageValue}");
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
}
public static void OnNewConnection(MqttConnectionValidatorContext context)
{
Console.WriteLine(
$"New connection: ClientId = {context.ClientId}, Endpoint = {context.Endpoint}");
}
public static void OnNewMessage(MqttApplicationMessageInterceptorContext context)
{
var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);
In_MessageCounter ;
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
Console.WriteLine("Messsage IN :->");
Console.WriteLine($"MessageId: {In_MessageCounter} - TimeStamp: {DateTime.Now} -- Message: ClientId = {context.ClientId}, Topic = {context.ApplicationMessage?.Topic}, Payload = {payload}, QoS = {context.ApplicationMessage?.QualityOfServiceLevel}, Retain-Flag = {context.ApplicationMessage?.Retain} Message {Encoding.UTF8.GetString(context?.ApplicationMessage?.Payload)}");
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
Console.WriteLine("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------");
}
CodePudding user response:
From the source:
/// Publish a message asynchronously /// <param name="topic">Message topic</param> /// <param name="message">Message data (payload)</param> /// <param name="qosLevel">QoS Level</param> /// <param name="retain">Retain flag</param> /// <returns>Message Id related to PUBLISH message</returns> public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
You probably want to set the last value to false
mqttClient.Publish("Application1/NEW_Message", Encoding.UTF8.GetBytes($"{Message}"), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
For more details about retained messages see here:
https://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages/