func (c *client) Init(conf config.Config, topic string) (err error) {
defer func() {
p := recover()
switch v := p.(type) {
case error:
err = v
}
}()
c.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: conf.Brokers,
GroupID: conf.GroupID,
Topic: topic,
MaxWait: 1 * time.Second, // maximum time to wait for new messages
MinBytes: 1, // minimum message size
MaxBytes: 10e6, // maximum message size 1 MByte (= maximum size Kafka can handle)
RetentionTime: time.Hour * 24 * 7, // keep ConsumerGroup for 1 week
WatchPartitionChanges: true, // watch for changes to the partitions (e.g. increase of partitions)
})
if conf.TlsEnabled {
d := &kafka.Dialer{
TLS: &tls.Config{},
}
}
return err
}
Long story short: what I wanna do is adding the field Dialer: d
to c.reader
if TlsEnabled is true! c.reader is of type ReaderConfig which already contains Dialer field which is in my case:
d := &kafka.Dialer{
TLS: &tls.Config{},
}
CodePudding user response:
If I understand your question correctly, you want to set the Dialer
field on kafka.ReaderConfig
if, and only if conf.TlsEnabled
is true. In that case, you should just move your if conf.TlsEnabled
check before you call kafka.NewReader
, and assign the kafka.ReaderConfig
to a variable, like so:
rConf := kafka.ReaderConfig{
Brokers: conf.Brokers,
GroupID: conf.GroupID,
Topic: topic,
MaxWait: 1 * time.Second, // maximum time to wait for new messages
MinBytes: 1, // minimum message size
MaxBytes: 10e6, // maximum message size 1 MByte (= maximum size Kafka can handle)
RetentionTime: time.Hour * 24 * 7, // keep ConsumerGroup for 1 week
WatchPartitionChanges: true, // watch for changes to the partitions (e.g. increase of partitions)
}
if conf.TlsEnabled {
rConf.Dialer = &kafka.Dialer{
TLS: &tls.Config{},
}
}
// now assign c.reader
c.reader = kafka.NewReader(rConf)
Just a small nit-pick: in golang, acronyms and other initialisms should be in all-caps. Your config type should not have a field called TlsEnabled
, but rather it should be TLSEnabled
.
CodePudding user response:
You can't add a field to an existing type but you can embed the client
in a custom type. The fields can then be accessed directly via the custom type
type myType struct {
something string
foo int
bar int
}
type myExtendedType struct {
myType
dialer *kafka.Dialer
}
func main() {
ext := myExtendedType{
myType: myType{
something: "some",
foo: 24,
bar: 42,
},
dialer: &kafka.Dialer{
TLS: &tls.Config{},
},
}
println(ext.something, ext.foo, ext.bar, ext.dialer)
}