Home > Software engineering >  Process data asynchronously as it arrives from a server in order?
Process data asynchronously as it arrives from a server in order?

Time:03-03

I'm connecting to an XMPP chat server using NetCoreServer. Everything is working as intended. Whenever the server sends a message I process it using a normal method processData(string data). The problem is that if the method takes longer than a specific amount of time, the server closes the connection.

I was thinking about executing the method asynchronously, but the problem is that messaging coming from the server could be split into parts. The method process data detects that, and if the message received is just one part of the entire message, it'll store it. Next time it's called appends the new message to the older one, checks if the new message completes it or if it needs to wait for the next message and so on, until it has a complete one. Then it'll continue with processing it, so if it's called asynchronously, the calls have to wait for previous ones before executing, without blocking the NetCoreServer's OnReceive.

I am thinking of adding a var task=new Task(() => { ProcessData(result); }); to a queue whenever new data from the server arrives, but I don't know how to chain their execution or how to proceed. Or I could store data in a queue as it arrives, and somehow trigger an event to call ProcessData whenever a new message is added into the queue. But I'm having the same problem aside from not knowing how, that events triggered should wait for the previous ones completion.

ProcessData looks something like this:

public Class DataProcessor
{
private string Buffer;
public void processData(string data)
{
    if(PartialData(data)) {
        Buffer =data;
        return;
    }
    else //continue processing
}

CodePudding user response:

There are many tools that you could use for solving this problem. Here I'll show a TPL Dataflow solution. You will need two ActionBlock<T>s, one for joining the split parts of the messages, and one for processing the complete messages. I am writing them below in the reverse order, because the first block needs to know about the second block during its construction. This example assumes that the final part of each complete message ends with a dot character:

var block2 = new ActionBlock<string[]>(parts =>
{
    string completeMessage = String.Join(" ", parts);
    Console.WriteLine($"Processing message: {completeMessage}");
});

var parts = new List<string>();
var block1 = new ActionBlock<string>(rawMessage =>
{
    if (rawMessage is null) { block2.Complete(); return; }
    parts.Add(rawMessage);
    if (rawMessage.EndsWith("."))
    {
        block2.Post(parts.ToArray());
        parts.Clear();
    }
});

block1.Post("Hello");
block1.Post("world.");
block1.Post("The quick");
block1.Post("brown fox.");

block1.Post(null); // Signal that there are no more messages
block1.Complete();
await block2.Completion;
Console.WriteLine("Processing terminated");

Output:

Processing message: Hello world.
Processing message: The quick brown fox.
Processing terminated

(Live demo)

As you can see there is a List<string> that holds the parts of the incomplete message that is currently received. Each time a message is completed, the parts are propagated to the block2, and the list is cleared.

A special message with the value null signals that no more messages are going to be received, so that you can Complete the block2 and await for its Completion, for a clean and graceful termination of the process.

The two ActionBlock<T>s are working in parallel to each other. Each one of them contains its own internal input queue with messages to process (this queue is unbounded). The Post method just sends the message to the input queue of the target block, without waiting for the processing of the message. This method return true if the message is accepted by the target block, and false otherwise. Common reasons for not accepting a message is that the Complete method has been called, or that the block has failed because an unhandled exception has occurred.

The TPL Dataflow library is part of the standard libraries in .NET 6. You don't need to install anything to use it. Unless you are targeting the old .NET Framework, in which you need to install this NuGet package.

  • Related