I start learning to work with socket recently. The behaviour I see in documentation is that when reading stream with a buffer, it return zero to indicate reader reached end of stream.
private static async Task ProcessStreamAsync(NetworkStream stream)
{
while (true)
{
var buffer = new Byte[64];
var received = await stream.ReadAsync(buffer);
Console.WriteLine($"Count:{received}");
var message = Encoding.UTF8.GetString(buffer, 0, received);
Console.WriteLine(message);
if (received == 0) break; // This line does not reach ever in my case!
}
}
But it is not the case I see in my code. In the last reading loop, it returns a buffer which is not full and then wait infinitely. I try to handle it with comparing buffer length and buffer capacity.
private static async Task ProcessStreamAsync(NetworkStream stream)
{
while (true)
{
var buffer = new Byte[64];
var received = await stream.ReadAsync(buffer);
Console.WriteLine($"Count:{received}");
var message = Encoding.UTF8.GetString(buffer, 0, received);
Console.WriteLine(message);
// if (received == 0) break;
if (received < buffer.Length) { break; }
}
}
but this is not possible in case of using PipeReader which allocates buffer automatically and buffer size is not available in APIs.
private static async Task ProcessWithPipes(NetworkStream stream)
{
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(stream);
try
{
bool completed = false;
do
{
var result = await reader.ReadAsync();
if (result.Buffer.Length == 0)
{
completed = true;
Console.WriteLine("Receive Empty Buffer, Client Closed");
}
var buffer = result.Buffer;
if (buffer.IsSingleSegment)
{
string message = Encoding.UTF8.GetString(buffer);
Console.WriteLine(message);
}
else
{
foreach (var item in buffer)
{
string message = Encoding.UTF8.GetString(item.Span);
Console.WriteLine(message);
}
}
if (result.IsCompleted)
{
completed = true;
Console.WriteLine("Stream Reading Completed, Client Closed");
}
var nextPosition = buffer.GetPosition(buffer.Length);
reader.AdvanceTo(nextPosition);
} while (!completed);
await reader.CompleteAsync();
}
catch (System.Exception)
{
throw;
}
}
}
By the way I am using linux ubuntu for my development. Is this a normal behaviour?
CodePudding user response:
I think changing the method you use will also solve your problem. You can get healthier results by using the method I shared the link of. I will be glad if you inform me about the result.
Try This; https://stackoverflow.com/a/26059057/12645793
CodePudding user response:
After reading responses and comments, I found two way for solving this:
1- Use underlying network stream DataAvailable
property instead of ReadResult IsCompleted
or Buffer.Length==0
to break the while loop as below:
private static async Task ProcessWithPipes(NetworkStream stream)
{
var reader = PipeReader.Create(stream);
try
{
bool completed = false;
do
{
var result = await reader.ReadAsync();
if (!stream.DataAvailable)
{
completed = true;
Console.WriteLine("Receive Empty Buffer, Client Closed");
}
var buffer = result.Buffer;
if (buffer.IsSingleSegment)
{
string message = Encoding.UTF8.GetString(buffer);
Console.WriteLine(message);
}
else
{
foreach (var item in buffer)
{
string message = Encoding.UTF8.GetString(item.Span);
Console.WriteLine(message);
}
}
if (result.IsCompleted)
{
completed = true;
Console.WriteLine("Stream Reading Completed, Client Closed");
}
var nextPosition = buffer.GetPosition(buffer.Length);
reader.AdvanceTo(nextPosition);
} while (!completed);
await reader.CompleteAsync();
}
catch (System.Exception)
{
throw;
}
}
}
2- Use PipeWriter to fill pipe with network stream and then use PipeReader to read it
private static async Task ProcessStreamUsingPipe(Socket socket)
{
Pipe pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private static async Task FillPipeAsync(Socket client, PipeWriter writer)
{
const int minimumBuffersize = 32;
while (true)
{
var buffer = writer.GetMemory(minimumBuffersize);
try
{
int received = await client.ReceiveAsync(buffer, SocketFlags.None);
Console.WriteLine($"Received Count:{received}");
var result = await writer.FlushAsync();.
writer.Advance(received);
if (received < buffer.Length) break;
if (result.IsCompleted)
{
break;
}
}
catch (System.Exception)
{
throw;
}
}
await writer.CompleteAsync();
}
private static async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
var msg = Encoding.UTF8.GetString(buffer);
Console.WriteLine(msg);
reader.AdvanceTo(buffer.Start);
if (result.IsCompleted) break;
}
await reader.CompleteAsync();
}