Home > Enterprise >  How to read continuously and asynchronously from network stream using abstract stream?
How to read continuously and asynchronously from network stream using abstract stream?

Time:08-17

Description

I would like to read asynchronously from a NetworkStream or SSLStream using their abstract Stream parent class. There are different ways to read asynchronously from stream:

  • Asynchronous Programming Model (APM): It uses BeginRead and EndRead operations.
  • Task Parallel Library (TPL): It uses Task and creates task continuations.
  • Task-based Asynchronous Pattern (TAP): Operations are suffixed Async and async and await keyword can be used.

I am mostly interested using the TAP pattern to achieve asynchronous read operation. Code below, asynchronously read to the end of stream and returns with the data as byte array:

    internal async Task<byte[]> ReadToEndAsync(Stream stream)
    {
        byte[] buffer = new byte[4096];
        using (MemoryStream memoryStream = new MemoryStream())
        {
            int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            while (bytesRead != 0)
            {
                // Received datas were aggregated to a memory stream.
                await memoryStream.WriteAsync(buffer, 0, bytesRead);
                bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            }

            return memoryStream.ToArray();
        }
    }

The buffer size is 4096kB. If the transferred data is more than the buffer size then it continues to read until 0 (zero), the end of the stream. It works with FileStream but it hangs indefinitely at the ReadAsync operation using NetworkStream or SslStream. These two stream behave differently than other streams.

The problem lies behind that the network stream ReadAsync will only return with 0 (zero) when the Socket communication is closed. However I do not want to close the communication every time a data is transferred through the network.

Question

How can I avoid the blocking call of the ReadAsync without closing the Socket communication?

CodePudding user response:

I would like to read asynchronously from a NetworkStream or SSLStream using their abstract Stream parent class.

Do you really need to? TCP/IP socket communication is quite complex to do correctly. I strongly recommend self-hosting an HTTP API or something like that instead.

The problem lies behind that the network stream ReadAsync will only return with 0 (zero) when the Socket communication is closed.

Yes, and the code you posted only returns a result when the entire stream has been read.

So, what you need is a different return condition. Specifically, you need to know when a single message has been read from the stream. What you need is message framing, as described on my blog.

I also have some sample code on my blog showing one of the simplest message framing solutions (length prefix). Take careful note of the complexity and length of the simplest solution, and consider whether you really want to write socket-level code.

If you do want to continue writing socket applications, I recommend watching my YouTube series on asynchronous TCP/IP socket programming.

CodePudding user response:

As Stephen Cleary blog post pointed out there are two approaches used for message framing: length prefixing and delimiters.

  • Length prefixing: Message length was known, because it was sent through the network.
  • Delimiters: Message length was unknown. Escaping was determined by the delimiter characters

Code below uses delimiter characters <EOF> in order to decide whenever return with the current transmitted message.

internal static readonly byte[] EOF = Encoding.UTF8.GetBytes("<EOF>");

internal async Task<byte[]> ReadToEOFAsync(Stream stream)
{
    byte[] buffer = new byte[8192];
    using (MemoryStream memoryStream = new MemoryStream())
    {
        long eofLength = EOF.LongLength;
        byte[] messageTail = new byte[eofLength];
        while (!messageTail.SequenceEqual(EOF))
        {
            int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
            await memoryStream.WriteAsync(buffer, 0, bytesRead);

            Array.Copy(memoryStream.GetBuffer(), memoryStream.Length - eofLength, messageTail, 0, eofLength);
        }

        // Truncate the EOF tail from the message stream
        byte[] result = new byte[memoryStream.Length - eofLength];
        Array.Copy(memoryStream.GetBuffer(), 0, result, 0, result.LongLength);

        return result;
    }
}

The bufferTail is an additional variable to track the delimiter characters in the received message. Slicing the end of the buffer was achieved by the Array.Copy method. Read until the <EOF> delimiter characters were reached.

Final step is to truncate the delimiter characters from the received message. It is also accomplished by copying the appropriate number of bytes into a result array. There is a more sophisticated way of slicing using Span introduced in .NET Core 2.1.

Furthermore the writers need to include the delimiter characters writing after sending a message.

await stream.WriteAsync(message, 0, message.Length);
await stream.WriteAsync(EOF, 0, EOF.Length);

Note considering to prevent DOS attack:

Whether using length-prefixing or delimiters, one must include code to prevent denial of service attacks. Length-prefixed readers can be given a huge message size; delimiting readers can be given a huge amount of data without delimiters.

  • Related