Home > Mobile >  How to read all messages from kafka using segmentio/kafka-go?
How to read all messages from kafka using segmentio/kafka-go?

Time:08-18

I run the example from the package documentation segmentio/kafka-go, but in it I get 1 message at a time.

Is there any way to read all the messages that have accumulated in Kafka at a time and parse them immediately into []MyType?


func main() {

    // to consume messages
    kafkaBrokerUrl := "localhost:9092"
    topic := "test"

    conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

    b := make([]byte, 10e3) // 10KB max per message
    for {
        n, err := batch.Read(b)
        if err != nil {
            break
        }
        fmt.Println(string(b[:n]))
    }
}

CodePudding user response:

Kafka doesn't help in batch processing , if you mean something like sliding window mechanism to take a batch of streams for a particular time ,size better to use a apache kafka and apache flink connector , flink provides sliding window mechanism , hdfs storage and moreover flink gives a much better fault tolerance by checkpoints . Unfortunately the flink go sdk available is unstable.

  • Related