Home > Blockchain >  gRPC file upload always returns The request stream was aborted
gRPC file upload always returns The request stream was aborted

Time:11-09

Im trying to upload a file from the client to the server in chunks. Is been ten years since I have tried to code a chunk uploader myself, this is harder then I remember.

upload.proto

syntax = "proto3";

option csharp_namespace = "GrpcService1";

package upload;

// The FileDownload service definition.
service Uploader {
  // Download a file
  rpc UploadFileStream(stream UploadFileRequest) returns (UploadFileResponse) {}
}

// The request message containing file data, and file name
message UploadFileRequest {
  bytes data = 1;
  string fileName = 2;
}

// The response from the upload containing the filePath
message UploadFileResponse {
  string filePath =1;
}

UploaderService

using Grpc.Core;

namespace GrpcService1.Services;

public class UploaderService : Uploader.UploaderBase
{
    private readonly ILogger<GreeterService> _logger;

    public UploaderService(ILogger<GreeterService> logger)
    {
        _logger = logger;
    }
    
    public override async Task<UploadFileResponse> UploadFileStream(IAsyncStreamReader<UploadFileRequest> request, ServerCallContext context)
    {
        while (await request.MoveNext())
        {
            Console.WriteLine(request.Current.Data);
        }

        return new UploadFileResponse(){ FilePath = @"c:\uploaded\file1.txt"};
    }
}

client upload test method

 public static async Task Upload(GrpcChannel channel, string filePath)
    {
        var clientUpFile = new Uploader.UploaderClient(channel);
        using var uploadStream = clientUpFile.UploadFileStream();

        var x = File.Exists(filePath);
        
        using (FileStream fileStream = File.OpenRead(filePath) )
        {
            var bufferSize = 512000;
            var buffer = new byte[bufferSize];
            var lastBytesRead = 0;
            var byteCount = 0;

            while ((lastBytesRead = fileStream.Read(buffer, 0, bufferSize)) != 0)
            {
                Console.WriteLine(lastBytesRead);
                
                if (lastBytesRead > 0)
                {
                    await uploadStream.RequestStream.WriteAsync(new UploadFileRequest()
                    {
                        Data = await ByteString.FromStreamAsync(fileStream, CancellationToken.None),
                        FileName = filePath
                    });
                }
            }
        }

Issue

The client seems to be uploading the chunks by look at its console. log

Hello World
[ { "name": "Files\DaimtoLogo.jpg" } ]
512

However the server seems to get a single chunk, which makes sense as there appears to be only one. Then it throws an error reading the stream.

Google.Protobuf.ByteString info: Grpc.AspNetCore.Server.ServerCallHandler[14] Error reading message. System.IO.IOException: The request stream was aborted. ---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: The HTTP/2 connection faulted. ---> Microsoft.AspNetCore.Connections.ConnectionResetException: An existing connection was forcibly closed by the remote host. ---> System.Net.Sockets.SocketException (10054): An existing connection was forcibly closed by the remote host. at Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal.SocketConnection.DoReceive() --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.DuplexPipeStream.ReadAsyncInternal(Memory1 destination, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory1 buffer) at System.IO.Pipelines.StreamPipeReader.g__Core|36_0(StreamPipeReader reader, CancellationTokenSource tokenSource, CancellationToken cancellationToken) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ReadInputAsync() at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ProcessRequestsAsync[TContext](IHttpApplication1 application) --- End of inner exception stack trace --- --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadStreamMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func2 deserializer, CancellationToken cancellationToken) fail: Grpc.AspNetCore.Server.ServerCallHandler[6] Error when executing service method 'UploadFileStream'. System.IO.IOException: The request stream was aborted. ---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: The HTTP/2 connection faulted. ---> Microsoft.AspNetCore.Connections.ConnectionResetException: An existing connection was forcibly closed by the remote host. ---> System.Net.Sockets.SocketException (10054): An existing connection was forcibly closed by the remote host. at Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal.SocketConnection.DoReceive() --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.DuplexPipeStream.ReadAsyncInternal(Memory1 destination, CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory1 buffer) at System.IO.Pipelines.StreamPipeReader.g__Core|36_0(StreamPipeReader reader, CancellationTokenSource tokenSource, CancellationToken cancellationToken) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ReadInputAsync() at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2Connection.ProcessRequestsAsync[TContext](IHttpApplication1 application) --- End of inner exception stack trace --- --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadStreamMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func2 deserializer, CancellationToken cancellationToken) at Grpc.AspNetCore.Server.Internal.HttpContextStreamReader1.<MoveNext>g__MoveNextAsync|9_0(ValueTask1 readStreamTask) at GrpcService1.Services.UploaderService.UploadFileStream(IAsyncStreamReader1 request, ServerCallContext context) in C:\Development\FreeLance\Glassix\gRCPTest\grcp-samaple\gRCPService\GrpcService1\Services\UploaderService.cs:line 16 at Grpc.Shared.Server.ClientStreamingServerMethodInvoker3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader1 requestStream) at Grpc.Shared.Server.ClientStreamingServerMethodInvoker3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, IAsyncStreamReader1 requestStream) at Grpc.AspNetCore.Server.Internal.CallHandlers.ClientStreamingServerCallHandler3.HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext) at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerCallHandlerBase3.<HandleCallAsync>g__AwaitHandleCall|8_0(HttpContextServerCallContext serverCallContext, Method2 method, Task handleCall)

  1. what is closing the stream
  2. shouldnt the stream stay open until it gets the response back from the server that the file was uploaded?

Note: Updated to fix a few typos noted in answer below.

CodePudding user response:

I finally found the issue. The key here is await uploadStream.RequestStream.CompleteAsync();

I found this single line in the documentation under Client streaming call

When the client has finished sending messages, RequestStream.CompleteAsync() should be called to notify the service.

public static async Task Upload(GrpcChannel channel, string filePath)
    {
        var clientUpFile = new Uploader.UploaderClient(channel);
        using var uploadStream = clientUpFile.UploadFileStream();

        // check file exits before uploading 
        if (!File.Exists(filePath)) throw new FileNotFoundException();

        using (FileStream fileStream = File.OpenRead(filePath) )
        {
            var buffer =  ArrayPool<byte>.Shared.Rent(32 * 1024);
            var lastBytesRead = 0;

            while ((lastBytesRead = fileStream.Read(buffer, 0, buffer.Length)) != 0)
            {
                if (lastBytesRead > 0)
                {
                    await uploadStream.RequestStream.WriteAsync(new UploadFileRequest()
                    {
                        Data = ByteString.CopyFrom(buffer,0 , lastBytesRead),
                        FileName = Path.GetFileName(filePath)
                    });
                }
            }
            
            // Notify the service when the client has finished sending messages
            await uploadStream.RequestStream.CompleteAsync();  
            ArrayPool<byte>.Shared.Return(buffer);
        }

        // read the response
        var response = await uploadStream.ResponseAsync;
        Console.WriteLine(response.FilePath);
        
    }

final UploaderService

for anyone that needs this in the future. This will store the file thats been uploaded.

public class UploaderService : Uploader.UploaderBase
{
    private readonly ILogger<GreeterService> _logger;

    public UploaderService(ILogger<GreeterService> logger)
    {
        _logger = logger;
    }

    public override async Task<UploadFileResponse> UploadFileStream(IAsyncStreamReader<UploadFileRequest> request,
        ServerCallContext context)
    {
        try
        {
            var dir = "Files";
            var fileName = "temp";

            await using (var fs = System.IO.File.OpenWrite($"{dir}\\temp"))
            {
                await foreach (var chunkMsg in request.ReadAllAsync().ConfigureAwait(false))
                {
                    fileName = chunkMsg.FileName;

                    fs.Write(chunkMsg.Data.ToByteArray());
                }
            }

            System.IO.File.Move($"{dir}\\temp", $"{dir}\\{fileName}", true);

            _logger.LogDebug(@"[FileUploaded] 'Files\{FileName}' uploaded", fileName);
            return new UploadFileResponse() { FilePath = $@"Files\{fileName}" };
        }
        catch (Exception e)
        {
            return new UploadFileResponse() { ErrorMessage = e.Message };
        }
    }
}

CodePudding user response:

I would guess that the missing await here:

uploadStream.RequestStream.WriteAsync(...);

is causing invalid overlapped writes, crashing the client code, causing the end of the using scope of uploadStream, destroying that conversation. The fact that you have async void and no exception handling means that you probably just aren't observing this failure in the client.

Suggestions:

  1. add the await
  2. never use async void

As a side note: you can probably lease your buffer here:

var buffer = ArrayPool<byte>.Shared.Rent(32 * 1024); // or whatever
// ...
while ((lastBytesRead = fileStream.Read(buffer, 0, buffer.Length)) != 0)
{ ... }
// ...
ArrayPool<byte>.Shared.Return(buffer);
  • Related