I am newbie in Golang. I m trying to write Kafka Producer.
I m getting msg on pch
channel. As soon as I get them, I want to send it to Kafka topic.
This code has to issue.
- Inside
select
I am getting warning,should use for range instead of for { select {} } (S1000)go-staticcheck
- After select I am getting warning,
unreachable code
Will you help me understand how to avoid and write better code?
func SendMsg(pch <-chan ProduceMsg) {
p, err := kafka.NewProducer(k.getConfigMap())
if err != nil {
log.Fatalf("Failed to create producer: %s\n", err)
}
log.Printf("Created Producer %v\n", p)
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.TopicPartition.Error != nil {
log.Printf("Delivery failed in producer: %v\n", m.TopicPartition.Error)
} else {
log.Printf("Delivered message in producer to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
case kafka.Error:
log.Printf("Error in reading producer event: %v\n", ev)
default:
log.Printf("Ignored producer event: %s\n", ev)
}
}
}()
var msgcnt int = 0
for {
select {
// Getting static check Error. How to avoid this?
case value := <-pch:
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: k.topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
}, nil)
if err != nil {
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
time.Sleep(time.Second)
continue
}
fmt.Printf("Failed to produce message: %v\n", err)
}
msgcnt
}
}
// This code is unreachable. How to avoid this?
for p.Flush(10000) > 0 {
fmt.Print("Still waiting to flush outstanding messages\n", err)
}
p.Close()
}
CodePudding user response:
The code unreachable error is because there is no way out of the for-loop above. That looks like a bug.
The warning about for-select is that you can replace the for-select with a :
for value:=range pch {
...
}
This will read from pch until pch closes. That will get rid of both the unreachable code error and the warning. The semantics are different though. In the original program you don't get out of the for-loop at all, so you have to decide if this is really what you need to do.