Home > Back-end >  Send/produce json message through kafka
Send/produce json message through kafka

Time:11-24

This is my first time using Kafka and I am planning to use kafka with .net

I wanted to know if I can send JSON as a message when I producing an event

I am following the tutorial: https://developer.confluent.io/get-started/dotnet/#build-producer

Also, is there a way for that value to be mapped to a model so that value/json structure is always tied to that model

So for example: if I want my json value to be

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

Most of the examples that I can find are like this:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages;   i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced  = 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

I would like the item to be a class in json structure.

CodePudding user response:

wanted to know if I can send JSON as a message when I producing an event

Yes. Kafka stores bytes and converts the bytes using Serializers. When building a Producer, you have the option of calling SetValueSerializer.

Some of the built-in serializers can be found at - https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs

You'd need to write your own to generically handle any JSON model types.

When using the Utf8Serializer for strings, you'll need to pre-serialize the object from your model class, then send that as the value. In your example, you'd replace var item with some serialized object.

How do I turn a C# object into a JSON string in .NET?

When using model classes, your data will typically be strongly-typed until you start manually writing the JSON or use Dictionary types. If you wanted external message validation, the Confluent Schema Registry is one example that supports JSONSchema and the JsonSerializer from confluent-dotnet-kafka project supports this.

  • Related