Home > other >  gRPC Async Server : send stream from client to server: assertion failed: ok
gRPC Async Server : send stream from client to server: assertion failed: ok

Time:09-01


I'm trying to setup a gRPC Async server which received a stream from the client (ServerAsyncReader usage). I'm a bit confusing with the process. I'm stuck on an assertion error.
Here it's my GreeterAsyncServer.

The assertion failed is at line GPR_ASSERT(ok); : ok is false when client sends colors (string)

void GreeterAsyncServer::HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallDataSendColors(&service_, cq_.get(), this);

    void* tag;  // uniquely identifies a request.
    bool ok;

    while (true) {
        // Block waiting to read the next event from the completion queue. The
        // event is uniquely identified by its tag, which in this case is the
        // memory address of a CallData instance.
        // The return value of Next should always be checked. This return value
        // tells us whether there is any kind of event or cq_ is shutting down.
        GPR_ASSERT(cq_->Next(&tag, &ok));
        GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}
void GreeterAsyncServer::Run(std::string server_address) {
    ServerBuilder builder;
    // Listen on the given address without any authentication mechanism.
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    // Register "service" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService(&service_);
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    cq_ = builder.AddCompletionQueue();
    // Finally assemble the server.
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRpcs();
}

Here it's my .proto file

// .proto file
// The greeting service definition.
service Greeter {
  // Sends several colors to the server and receives the replies
  rpc SendColors (stream ColorRequest) returns (HelloReply) {}
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

// The request message containing the color
message ColorRequest {
  string color = 1;
}

Here it's my CallData class:

#pragma once

#include "CallDataT.h"

using grpc::ServerAsyncReader;
using helloworld::HelloReply;
using helloworld::ColorRequest;

class CallDataSendColors {
protected:
    enum CallStatus { CREATE, PROCESS, FINISH };
    CallStatus status_;
    Greeter::AsyncService* service_;
    ServerCompletionQueue* completionQueue_;
    ServerContext serverContext_;

    // What we get from the client.
    ColorRequest request_;
    // What we send back to the client.
    HelloReply reply_;
    // The means to get back to the client.
    ServerAsyncReader<HelloReply, ColorRequest> reader_;

    virtual void AddNextToCompletionQueue() override {
        new CallDataSendColors(service_, completionQueue_, observer_);
    }

    virtual void WaitForRequest() override {
        service_->RequestSendColors(&serverContext_, &reader_, completionQueue_, completionQueue_, this);
    }

    virtual void HandleRequest() override {
        reader_.Read(&request_, this);
    }

    virtual void Proceed() override {
        if (status_ == CREATE) {
            status_ = PROCESS;
            WaitForRequest();
        }
        else if (status_ == PROCESS) {
            AddNextToCompletionQueue();
            HandleRequest();
            status_ = FINISH;
            reply_.set_message(std::string("Color ")   std::string("received"));
            reader_.Finish(reply_, grpc::Status::OK, this);
        }
        else {
            // We're done! Self-destruct!
            if (status_ != FINISH) {
                // Log some error message
            }
            delete this;
        }
    }

public:
    CallDataSendColors(Greeter::AsyncService* service, ServerCompletionQueue* completionQueue, IObserver* observer = nullptr) :
        status_(CREATE),
        service_(service),
        completionQueue_(completionQueue),
        reader_(&serverContext_) {            
        Proceed();
    }
};

As you can see in the HandleRequest() method, the reader_.Read(&request_, this) will be called only one time. I don't know how to call it as long as there are messages commin in.

I will appreciate if someone can help me.

Thank you in advance.

CodePudding user response:

I've found the solution. The method GreeterAsyncServer::HandleRpcs() must be edited link this:

    while (true) {
        bool gotEvent = cq_->Next(&tag, &ok);

        if (false == gotEvent || true == isShuttingDown_) {
            break;
        }

        //GPR_ASSERT(ok);
        if (true == ok) {
            static_cast<CallData*>(tag)->Proceed();
        }
    }

CodePudding user response:

My answer may not be the best, but I think it's one of the valid ones.

// The request message containing the color
message ColorRequest {
  string color = 1;
  bool eof = 2;
}

You should add a field like eof in ColorRequest message because server should know last message from client streaming rpc.

virtual void Proceed() override {
        if (status_ == CREATE) {
            status_ = PROCESS;
            WaitForRequest();
        }
        else if (status_ == INITIATE) {
            status_ = PROGRESS;
            AddNextToCompletionQueue();
            reader_.Read(&request_, this);
        } else if (status_ == PROGRESS) {
            // handle request
            if (request_.eof()) {
                status_ = FINISH;
                reply_.set_message(std::string("Color ")   std::string("received"));
                reader_.Finish(reply_, grpc::Status::OK, this);
            } else {
                reader_.Read(&request_, this);
            }
        } else {
            // We're done! Self-destruct!
            if (status_ != FINISH) {
                // Log some error message
            }
            delete this;
        }
    }

I think it would be better to explain with code rather than explanations in detail. (In fact... it would be difficult for me to explain in English. I do not speak English well. I ask for your understanding.)

I additionally attach a unit test about the client stream rpc. The code is in grpc source tree. I think it will help you understand the client stream rpc mechanism.

TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
  ResetStub();

  EchoRequest send_request;
  EchoRequest recv_request;
  EchoResponse send_response;
  EchoResponse recv_response;
  Status recv_status;
  ClientContext cli_ctx;
  ServerContext srv_ctx;
  ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);

  send_request.set_message(GetParam().message_content);
  std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
      stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));

  service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                                 tag(2));

  Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());

  cli_stream->Write(send_request, tag(3));
  srv_stream.Read(&recv_request, tag(4));
  Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
  EXPECT_EQ(send_request.message(), recv_request.message());

  cli_stream->Write(send_request, tag(5));
  srv_stream.Read(&recv_request, tag(6));
  Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());

  EXPECT_EQ(send_request.message(), recv_request.message());
  cli_stream->WritesDone(tag(7));
  srv_stream.Read(&recv_request, tag(8));
  Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());

  send_response.set_message(recv_request.message());
  srv_stream.Finish(send_response, Status::OK, tag(9));
  cli_stream->Finish(&recv_status, tag(10));
  Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());

  EXPECT_EQ(send_response.message(), recv_response.message());
  EXPECT_TRUE(recv_status.ok());
}

ref. https://github.com/grpc/grpc/blob/622dd886e6e64ef11820a07167541c2442b48c1e/test/cpp/end2end/async_end2end_test.cc#L558-L600

I hope this helps.

  • Related