Home > Back-end >  One to One instant messaging using Kafka
One to One instant messaging using Kafka

Time:03-30

I'm using Scala and Kafka to create topic based pub-sub architecture. My question is how can I handle One-to-One Messaging part of my application using Kafka topics? This is my producer class:

class Producer(topic: String, key: String, brokers: String, message: String) {

  val producer = new KafkaProducer[String, String](configuration)

  private def configuration: Properties = {
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.ACKS_CONFIG, "all")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
    props
  }

  def sendMessages(): Unit = {
    val record = new ProducerRecord[String, String](topic, key, message)
    producer.send(record)
    
    producer.close()
  }
} 

And this is my consumer class:

class Consumer(brokers: String, topic: String, groupId: String) {

  val consumer = new KafkaConsumer[String, String](configuration)
  consumer.subscribe(util.Arrays.asList(topic))

  private def configuration: Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
    props
  }

  def receiveMessages(): Unit = {
    while (true) {
      consumer.poll(Duration.ofSeconds(0)).forEach(record => println(s"Received message: $record"))
    }
  }
}

I also have an auth service that takes cares care of everything related to authenticating via JWT tokens. I am confused on how to create messages to specific users, I thought about creating a "Messages" class but I got lost when it comes to how to send these "specific" users messages and how to partition these messages on kafka for later usage:

class Message {

  def sendMessage(sender_id: String, receiver_id: String, content: String): Unit ={
    val newMessage = new Producer(brokers = KAFKA_BROKER,key =sender_id   " to "   receiver_id, topic = "topic_1", message = content)
    newMessage.sendMessages()
  }
  
  def loadMessage(): Unit ={
    //
  }

}

My thought was to specify a custom key for all messages belonging to the same conversation but I couldn't find the right way to retrieve these messages later on as my consumer returns everything contained in that topic no matter what the key is. Meaning, all the users will eventually get all the messages. I know my modeling seems messy but I couldn't find the right way to do it, I'm also kinda confused when it comes to the usage of the group_id in the consumer. Could someone make me what's the right way to achieve what I'm trying to do here please ?

CodePudding user response:

couldn't find the right way to retrieve these messages later on ... consumer returns everything contained in that topic no matter what the key is

You would need to .assign the Consumer instance to a specific partition, not use .subscribe, which reads all partitions. Or you'd use specific topics for each conversation.

But then you need unique partitions/topics for every conversation that will exist. In a regular chat application where users create/remove rooms randomly, that will not scale for Kafka.

Ultimately, I'd suggest writing your data to somewhere else than Kafka that you can actually query and index on a "convertsationId" and/or user ids rather than try to forward those events directly from Kafka into your "chat" application.

  • Related