Home > Software engineering >  kafka consumer .net 'Protocol message end-group tag did not match expected tag.'
kafka consumer .net 'Protocol message end-group tag did not match expected tag.'

Time:02-28

I am trying to read data from kafka as you can see :

var config = new ConsumerConfig
{
    BootstrapServers = ""*******,
    GroupId = Guid.NewGuid().ToString(),
    AutoOffsetReset = AutoOffsetReset.Earliest
};
MessageParser<AdminIpoChange> parser = new(() => new AdminIpoChange());
using (var consumer = new ConsumerBuilder<Ignore, byte[]>(config).Build())
{

    consumer.Subscribe("AdminIpoChange");

    while (true)
    {
        AdminIpoChange item = new AdminIpoChange();
            var cr = consumer.Consume();
    
            item = parser.ParseFrom(new ReadOnlySpan<byte>(cr.Message.Value).ToArray());
    }

    consumer.Close();
}

I am using google protobuf for send and receive data .This code returns this error in parser line:

 KafkaConsumer.ConsumeAsync: Protocol message end-group tag did not match expected tag.
Google.Protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
   at Google.Protobuf.ParsingPrimitivesMessages.CheckLastTagWas(ParserInternalState& state, UInt32 expectedTag)
   at Google.Protobuf.ParsingPrimitivesMessages.ReadGroup(ParseContext& ctx, Int32 fieldNumber, UnknownFieldSet set)
   at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(ParseContext& ctx)
   at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(UnknownFieldSet unknownFields, ParseContext& ctx)
   at AdminIpoChange.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 213
   at Google.Protobuf.ParsingPrimitivesMessages.ReadRawMessage(ParseContext& ctx, IMessage message)
   at Google.Protobuf.CodedInputStream.ReadRawMessage(IMessage message)
   at AdminIpoChange.MergeFrom(CodedInputStream input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 188
   at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, Byte[] data, Boolean discardUnknownFields, ExtensionRegistry registry)
   at Google.Protobuf.MessageParser`1.ParseFrom(Byte[] data)
   at infrastructure.Queue.Kafka.KafkaConsumer.ConsumeCarefully[T](Func`2 consumeFunc, String topic, String group) in D:\MofidProject\infrastructure\Queue\Kafka\KafkaConsumer.cs:line 168

D:\MofidProject\mts.consumer.plus\bin\Debug\net6.0\mts.consumer.plus.exe (process 15516) exited with code -1001.
To automatically close the console when debugging stops, enable Tools->Options->Debugging->Automatically close the console when debugging stops.'

Updated:

My sample data that comes from Kafka :

 - {"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757 03:30"}

My rlc Model :

syntax = "proto3";

message AdminIpoChange
{
 string Id =1;
 string SymbolName =2;
 string SymbolIsin =3;
 string Date =4;
 string Time=5;
 double MinPrice =6;
 double MaxPrice =7;
 int32 Share =8;
 bool Show =9;
 int32 Operation =10;
 string  CreateDateTime=11;
enum AdminIpoOperation
{
    Add = 0;
    Edit = 1;
    Delete = 2;
}

}

My data in bytes :

7B 22 53 79 6D 62 6F 6C 4E 61 6D 65 22 3A 22 5C 75 30 36 34 31 5C 75 30 36 32 46 5C 75 30 
36 33 31 22 2C 22 53 79 6D 62 6F 6C 49 73 69 6E 22 3A 22 49 52 6F 33 70 7A 41 5A 30 30 30 
32 22 2C 22 44 61 74 65 22 3A 22 31 34 30 30 2F 31 32 2F 31 35 22 2C 22 54 69 6D 65 22 3A 
22 30 38 3A 30 30 2D 31 32 3A 30 30 22 2C 22 4D 69 6E 50 72 69 63 65 22 3A 31 37 37 32 36 
2C 22 4D 61 78 50 72 69 63 65 22 3A 32 31 36 36 36 2C 22 53 68 61 72 65 22 3A 31 30 30 30 
2C 22 53 68 6F 77 22 3A 66 61 6C 73 65 2C 22 4F 70 65 72 61 74 69 6F 6E 22 3A 30 2C 22 49 
64 22 3A 22 31 30 30 64 38 65 30 62 35 34 31 35 34 65 39 64 39 30 32 30 35 34 62 66 66 31 
39 33 65 38 37 35 22 2C 22 43 72 65 61 74 65 44 61 74 65 54 69 6D 65 22 3A 22 32 30 32 32 
2D 30 32 2D 32 36 54 30 39 3A 34 37 3A 32 30 2E 30 31 33 34 37 35 37 2B 30 33 3A 33 30 22 
7D 

CodePudding user response:

The data is definitely not protobuf binary; byte 0 starts a group with field number 15; inside this group is:

  • field 4, string
  • field 13, fixed32
  • field 6, varint
  • field 12, fixed32
  • field 6, varint

after this (at byte 151), an end-group token is encountered with field number 6

There are many striking things about this:

  1. your schema doesn't use groups (in fact, the mere existence of groups is now hard to find in the docs), so ... none of this looks right
  2. end-group tokens are always required to match the last start-group field number, which it doesn't
  3. fields inside a single level are usually (although as a "should", not a "must") written in numerical order
  4. you have no field 12 or 13 declared
  5. your field 6 is of the wrong type - we expect fixed64 here, but got varint

So: there's no doubt about it: that data is ... not what you expect. It certainly isn't valid protobuf binary. Without knowing how that data is stored, all we can do is guess, but on a hunch: let's try decoding it as UTF8 and see what it looks like:

{"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757 03:30"}

or (formatted)

{ 
 "SymbolName":"\u0641\u062F\u0631",
  "SymbolIsin":"IRo3pzAZ0002",
  "Date":"1400/12/15",
  "Time":"08:00-12:00", 
  "MinPrice":17726,
  "MaxPrice":21666,
  "Share":1000,
  "Show":false,
  "Operation":0,
  "Id":"100d8e0b54154e9d902054bff193e875",
  "CreateDateTime":"2022-02-26T09:47:20.0134757 03:30"
}

Oops! You've written the data as JSON, and you're trying to decode it as binary protobuf. Decode it as JSON instead, and you should be fine. If this was written with the protobuf JSON API: decode it with the protobuf JSON API.

  • Related