Home > Software design >  Determinig end of stream in .Net for Network Stream Reading
Determinig end of stream in .Net for Network Stream Reading

Time:12-29

I start learning to work with socket recently. The behaviour I see in documentation is that when reading stream with a buffer, it return zero to indicate reader reached end of stream.

private static async Task ProcessStreamAsync(NetworkStream stream)
        {
            while (true)
            {   
                var buffer = new Byte[64];
                var received = await stream.ReadAsync(buffer);
                Console.WriteLine($"Count:{received}");    
                var message = Encoding.UTF8.GetString(buffer, 0, received);
                Console.WriteLine(message);
                if (received == 0) break; // This line does not reach ever in my case!                  
            }

        }

But it is not the case I see in my code. In the last reading loop, it returns a buffer which is not full and then wait infinitely. I try to handle it with comparing buffer length and buffer capacity.

private static async Task ProcessStreamAsync(NetworkStream stream)
        {
            while (true)
            {
                var buffer = new Byte[64];
                var received = await stream.ReadAsync(buffer);
                Console.WriteLine($"Count:{received}");
                var message = Encoding.UTF8.GetString(buffer, 0, received);
                Console.WriteLine(message);
                // if (received == 0) break;
                if (received < buffer.Length) { break; }

            }

        }

but this is not possible in case of using PipeReader which allocates buffer automatically and buffer size is not available in APIs.

private static async Task ProcessWithPipes(NetworkStream stream)
        {
            var reader = PipeReader.Create(stream);
            var writer = PipeWriter.Create(stream);    

            try
            {
                bool completed = false;
                do
                {
                    var result = await reader.ReadAsync();    
                    if (result.Buffer.Length == 0)
                    {
                        completed = true;
                        Console.WriteLine("Receive Empty Buffer, Client Closed");
                    }
                    var buffer = result.Buffer;
                    if (buffer.IsSingleSegment)
                    {
                        string message = Encoding.UTF8.GetString(buffer);
                        Console.WriteLine(message);
                    }
                    else
                    {
                        foreach (var item in buffer)
                        {
                            string message = Encoding.UTF8.GetString(item.Span);
                            Console.WriteLine(message);
                        }

                    }
                    if (result.IsCompleted)
                    {
                        completed = true;
                        Console.WriteLine("Stream Reading Completed, Client Closed");
                    }
                    var nextPosition = buffer.GetPosition(buffer.Length);
                    reader.AdvanceTo(nextPosition);

                } while (!completed);
                await reader.CompleteAsync();


            }
            catch (System.Exception)
            {

                throw;
            }
        }
    }

By the way I am using linux ubuntu for my development. Is this a normal behaviour?

CodePudding user response:

I think changing the method you use will also solve your problem. You can get healthier results by using the method I shared the link of. I will be glad if you inform me about the result.

Try This; https://stackoverflow.com/a/26059057/12645793

CodePudding user response:

After reading responses and comments, I found two way for solving this: 1- Use underlying network stream DataAvailable property instead of ReadResult IsCompleted or Buffer.Length==0 to break the while loop as below:

private static async Task ProcessWithPipes(NetworkStream stream)
        {
            var reader = PipeReader.Create(stream); 

            try
            {
                bool completed = false;
                do
                {
                    var result = await reader.ReadAsync();    
                    if (!stream.DataAvailable)
                    {
                        completed = true;
                        Console.WriteLine("Receive Empty Buffer, Client Closed");
                    }
                    var buffer = result.Buffer;
                    if (buffer.IsSingleSegment)
                    {
                        string message = Encoding.UTF8.GetString(buffer);
                        Console.WriteLine(message);
                    }
                    else
                    {
                        foreach (var item in buffer)
                        {
                            string message = Encoding.UTF8.GetString(item.Span);
                            Console.WriteLine(message);
                        }

                    }
                    if (result.IsCompleted)
                    {
                        completed = true;
                        Console.WriteLine("Stream Reading Completed, Client Closed");
                    }
                    var nextPosition = buffer.GetPosition(buffer.Length);
                    reader.AdvanceTo(nextPosition);

                } while (!completed);
                await reader.CompleteAsync();


            }
            catch (System.Exception)
            {

                throw;
            }
        }
    }

2- Use PipeWriter to fill pipe with network stream and then use PipeReader to read it

private static async Task ProcessStreamUsingPipe(Socket socket)
        {
            Pipe pipe = new Pipe();
            Task writing = FillPipeAsync(socket, pipe.Writer);
            Task reading = ReadPipeAsync(pipe.Reader);
            await Task.WhenAll(writing, reading);
        }
        private static async Task FillPipeAsync(Socket client, PipeWriter writer)
        {
            const int minimumBuffersize = 32;
            while (true)
            {
                var buffer = writer.GetMemory(minimumBuffersize);
                try
                {
                    int received = await client.ReceiveAsync(buffer, SocketFlags.None);
                    Console.WriteLine($"Received Count:{received}");
                    var result = await writer.FlushAsync();.
                    writer.Advance(received); 
                    if (received < buffer.Length) break;
                    if (result.IsCompleted)
                    {
                        break;
                    }

                }
                catch (System.Exception)
                {
                    throw;
                }
            }
            await writer.CompleteAsync(); 
        }
        private static async Task ReadPipeAsync(PipeReader reader)
        {

            while (true)
            {
                var result = await reader.ReadAsync();
                var buffer = result.Buffer;
                var msg = Encoding.UTF8.GetString(buffer);
                Console.WriteLine(msg);
                reader.AdvanceTo(buffer.Start);
                if (result.IsCompleted) break;
            }

            await reader.CompleteAsync();
        }
  • Related