I am trying to make GRPC work in a digitalocean
Kubernetes
cluster - but am still to succeed.
PS: Apologies for the long content
I have found some content regarding this but those revolve around some ingress. For me, these are internal services.
I have a .proto
defined as such:
syntax = "proto3";
package imgproto;
option go_package = ".;imgproto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
service ImaginaryServer {
rpc Ping(PingPayload) returns (PongPayload) {}
}
message PingPayload {
google.protobuf.Timestamp ts = 1;
}
message PongPayload {
google.protobuf.Timestamp ts = 1;
}
After running proto-gen-go
, I populate the implementation
with:
type ImaginaryServerImpl struct {
imgproto.UnimplementedImaginaryServer
}
func (s *ImaginaryServerImpl) Ping(_ context.Context, in *imgproto.PingPayload) (*imgproto.PongPayload, error) {
fmt.Printf("ImaginaryServerImpl.Ping: %v\n", in)
return &imgproto.PongPayload{
Ts: timestamppb.New(time.Now()),
}, nil
}
Create and register it to a GRPC server:
grpcServer := grpc.NewServer()
imgproto.RegisterImaginaryServer(grpcServer, &ImaginaryServerImpl{})
And start the server:
grpcListener, err := net.Listen("tcp", fmt.Sprintf(":%d", constants.PORT_GRPC))
if err != nil {
return err
}
go func() {
if err := grpcServer.Serve(grpcListener); err != nil {
fmt.Println("GRPC Server startup failed with", err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
I wrote up the client as:
grpcConnection, err := grpc.Dial(
endpoint,
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("Calling GRPC:", method, req, reply, invoker)
return nil
}))
if err != nil {
return ctx, err
}
client := pmqproto.NewImaginaryClient(grpcConnection)
fmt.Println(" >>>>>>>>>>> PING:")
pong, pingErr := client.Ping(ctx, &imgproto.PingPayload{Ts: timestamppb.Now()}, grpc.WaitForReady(false))
if pingErr != nil {
fmt.Println(pingErr)
}
fmt.Println(" >>>>>>>>>>> PONG: ", pong.Ts.AsTime().String())
But it looks like the client is returning without actually invoking the RPC.
Log
that I am seeing in the client:
>>>>>>>>>>> PING:
Calling GRPC: /imgproto.ImaginaryServer/Ping ts:{seconds:1666113879 nanos:778900352} 0x127aa60
>>>>>>>>>>> PONG: 1970-01-01 00:00:00 0000 UTC
There are no logs in the server.
My k8s
yaml is as such:
apiVersion: v1
kind: Service
metadata:
name: reality
namespace: reality-ns
spec:
type: ClusterIP
selector:
app: reality
ports:
- name: grpc
protocol: TCP
port: 6772
targetPort: 6772
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: reality
namespace: reality-ns
labels:
app: reality
spec:
selector:
matchLabels:
app: reality
template:
metadata:
labels:
app: reality
spec:
containers:
- name: reality
image: registry.example.com/binaek/reality
imagePullPolicy: Always
command: ["cmd"]
ports:
- name: grpc
containerPort: 6772
I am not able to locate what I am doing wrong. Desperately need help at this point.
The image uses
gcr.io/distroless/base
as it's base.
CodePudding user response:
Your interceptor usage is likely preventing execution of the RPC.
// When a unary interceptor(s) is set on a ClientConn, gRPC
// delegates all unary RPC invocations to the interceptor, and it is the
// responsibility of the interceptor to call invoker to complete the processing
// of the RPC.
https://github.com/grpc/grpc-go/blob/9127159caf5a3879dad56b795938fde3bc0a7eaa/interceptor.go#L31-L34
So your interceptor function should instead look like:
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("Calling GRPC:", method, req, reply, invoker)
return invoker(ctx, method, req, reply, cc, opts...)
}))