I'm trying to setup a gRPC Async server which received a stream from the client (ServerAsyncReader usage). I'm a bit confusing with the process. I'm stuck on an assertion error.
Here it's my GreeterAsyncServer.
The assertion failed is at line GPR_ASSERT(ok); : ok is false when client sends colors (string)
void GreeterAsyncServer::HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallDataSendColors(&service_, cq_.get(), this);
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
void GreeterAsyncServer::Run(std::string server_address) {
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
Here it's my .proto file
// .proto file
// The greeting service definition.
service Greeter {
// Sends several colors to the server and receives the replies
rpc SendColors (stream ColorRequest) returns (HelloReply) {}
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
// The request message containing the color
message ColorRequest {
string color = 1;
}
Here it's my CallData class:
#pragma once
#include "CallDataT.h"
using grpc::ServerAsyncReader;
using helloworld::HelloReply;
using helloworld::ColorRequest;
class CallDataSendColors {
protected:
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
Greeter::AsyncService* service_;
ServerCompletionQueue* completionQueue_;
ServerContext serverContext_;
// What we get from the client.
ColorRequest request_;
// What we send back to the client.
HelloReply reply_;
// The means to get back to the client.
ServerAsyncReader<HelloReply, ColorRequest> reader_;
virtual void AddNextToCompletionQueue() override {
new CallDataSendColors(service_, completionQueue_, observer_);
}
virtual void WaitForRequest() override {
service_->RequestSendColors(&serverContext_, &reader_, completionQueue_, completionQueue_, this);
}
virtual void HandleRequest() override {
reader_.Read(&request_, this);
}
virtual void Proceed() override {
if (status_ == CREATE) {
status_ = PROCESS;
WaitForRequest();
}
else if (status_ == PROCESS) {
AddNextToCompletionQueue();
HandleRequest();
status_ = FINISH;
reply_.set_message(std::string("Color ") std::string("received"));
reader_.Finish(reply_, grpc::Status::OK, this);
}
else {
// We're done! Self-destruct!
if (status_ != FINISH) {
// Log some error message
}
delete this;
}
}
public:
CallDataSendColors(Greeter::AsyncService* service, ServerCompletionQueue* completionQueue, IObserver* observer = nullptr) :
status_(CREATE),
service_(service),
completionQueue_(completionQueue),
reader_(&serverContext_) {
Proceed();
}
};
As you can see in the HandleRequest() method, the reader_.Read(&request_, this) will be called only one time. I don't know how to call it as long as there are messages commin in.
I will appreciate if someone can help me.
Thank you in advance.
CodePudding user response:
I've found the solution. The method GreeterAsyncServer::HandleRpcs() must be edited link this:
while (true) {
bool gotEvent = cq_->Next(&tag, &ok);
if (false == gotEvent || true == isShuttingDown_) {
break;
}
//GPR_ASSERT(ok);
if (true == ok) {
static_cast<CallData*>(tag)->Proceed();
}
}
CodePudding user response:
My answer may not be the best, but I think it's one of the valid ones.
// The request message containing the color
message ColorRequest {
string color = 1;
bool eof = 2;
}
You should add a field like eof in ColorRequest message because server should know last message from client streaming rpc.
virtual void Proceed() override {
if (status_ == CREATE) {
status_ = PROCESS;
WaitForRequest();
}
else if (status_ == INITIATE) {
status_ = PROGRESS;
AddNextToCompletionQueue();
reader_.Read(&request_, this);
} else if (status_ == PROGRESS) {
// handle request
if (request_.eof()) {
status_ = FINISH;
reply_.set_message(std::string("Color ") std::string("received"));
reader_.Finish(reply_, grpc::Status::OK, this);
} else {
reader_.Read(&request_, this);
}
} else {
// We're done! Self-destruct!
if (status_ != FINISH) {
// Log some error message
}
delete this;
}
}
I think it would be better to explain with code rather than explanations in detail. (In fact... it would be difficult for me to explain in English. I do not speak English well. I ask for your understanding.)
I additionally attach a unit test about the client stream rpc. The code is in grpc source tree. I think it will help you understand the client stream rpc mechanism.
TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
srv_stream.Read(&recv_request, tag(4));
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
srv_stream.Read(&recv_request, tag(6));
Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
srv_stream.Read(&recv_request, tag(8));
Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
cli_stream->Finish(&recv_status, tag(10));
Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
I hope this helps.