I run the example from the package documentation segmentio/kafka-go
, but in it I get 1 message at a time.
Is there any way to read all the messages that have accumulated in Kafka at a time and parse them immediately into []MyType
?
func main() {
// to consume messages
kafkaBrokerUrl := "localhost:9092"
topic := "test"
conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
}
CodePudding user response:
Kafka doesn't help in batch processing , if you mean something like sliding window mechanism to take a batch of streams for a particular time ,size better to use a apache kafka and apache flink connector , flink provides sliding window mechanism , hdfs storage and moreover flink gives a much better fault tolerance by checkpoints . Unfortunately the flink go sdk available is unstable.