Home > Net >  ksqldb - exactly one delivery for pull query, multiple app instances
ksqldb - exactly one delivery for pull query, multiple app instances

Time:02-16

I am trying to build an application on top of ksqldb.

Let's say I will have a simple producer:

package main

import (
    "fmt"
    "github.com/rmoff/ksqldb-go"
    "net/http"
)

var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()

func init() {
    offset := `SET 'auto.offset.reset' = 'earliest';`
    if err := client.Execute(offset); err != nil {
        panic(err)
    }

    s1 := `
        CREATE OR REPLACE STREAM userEvents (
            userId VARCHAR KEY,
            eventType VARCHAR
        )
        WITH (
            kafka_topic='user_events', 
            value_format='json', 
            partitions=8
        );
    `
    if err := client.Execute(s1); err != nil {
        panic(err)
    }
}

func main() {
    http.HandleFunc("/emit", hello)
    http.ListenAndServe(":4201", nil)
}

func hello(w http.ResponseWriter, req *http.Request) {
    userId := req.URL.Query().Get("userId")
    if userId == "" {
        http.Error(w, "no userId", 400)
        return
    }
    userEvent := req.URL.Query().Get("event")
    if userEvent == "" {
        userEvent = "unknown"
    }

    err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",
        userId, userEvent))
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    w.WriteHeader(200)
    return
}

This app creates one stream of data and exposes one endpoint to populate the stream with the data.

Also, I have a consumer:

package main

import (
    "context"
    "fmt"
    "github.com/rmoff/ksqldb-go"
)

var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()

func main() {
    query := `SET 'auto.offset.reset' = 'earliest';`
    if err := client.Execute(query); err != nil {
        panic(err)
    }

    ctx := context.TODO()
    rows := make(chan ksqldb.Row)
    headers := make(chan ksqldb.Header)
    go func() {
        if err := client.Push(ctx,
            "SELECT * FROM userEvents EMIT CHANGES;",
            rows,
            headers); err != nil {
            panic(err)
        }
    }()

    h := <-headers
    fmt.Printf("headers: [%v]", h)

    for {
        select {
        case r := <-rows:
            fmt.Printf("received event: [%v]", r)
        }
    }
}

And I run one producer and multiple consumers, with the same query. How (and is it possible?) to receive the event only on one consumer? Now, with such a setup I receive those events on all available consumers, but I would like to process the event on a single one (the processing will be quite long, so I need this for parallelism).

To be honest I thought it's "standard", that all connected apps belong to the same group, and this kind of delivery I have for free.

The local cluster configuration (it's the standard config from Confluentic how-to-start):

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.23.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.23.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

Is something wrong with my configuration or do I misunderstand the usage of this database? Thanks for your help!

CodePudding user response:

First up, note that I no longer maintain that client, and you might want to check out https://github.com/thmeitz/ksqldb-go instead.


Now onto your question. If I'm understanding correctly you want to run multiple instances of the same logical consumer for parallelism purposes, and thus each message should be processed by that logical consumer once.

If that's the case then you are describing what is called a consumer group in Kafka. Multiple instances of a consumer identify themselves with the same client ID and Kafka ensures that data from across the source topic's partitions is routed to the available consumers within that group. If there are four consumers and eight partitions, each consumer is going to get the data from two partitions. If one consumer leaves the group (it crashes, you scale down, etc) then Kafka reassigns that consumer's partitions across the remaining consumers with the group.

This is different behaviour from what you are seeing, in which you are effectively instantiating multiple independent consumers. By design, Kafka ensures that each consumer that is subscribed to a topic receives all of the messages on that topic.

I'm deliberately talking about Kafka here, and not ksqlDB. That's because ksqlDB is built on Kafka and in order to make sense of what you are seeing it's important to explain the underpinning fundamentals.

To get the behaviour that you're looking for you probably want to look at using the Consumer API directly in your consumer application. You can see an example of the Consumer API in this quickstart for Golang and Kafka. To create a consumer group you specify a unique group.id.

  • Related