We found one mqtt.MessageHandler
is not working properly. In the handler, we will filter the coming message then pass the valid event to one func to process. The func is implemented as below:
func processEvent(i models.Foo) (string, error) {
var wg sync.WaitGroup
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
err := func1()
if err != nil {
return err
}
switch strings.ToUpper(i.Status) {
case "OK":
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask1()
ch := done
if err != nil {
log.Error("%s", err.Error())
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask2()
ch := done
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
result := "processed"
count := 0
for {
select {
case err := <-errc:
close(quit)
log.Info("event: %s, %s", "", err.Error())
return "", err
case <-done:
count
if count == 4 { // why 4???
return result, nil
}
}
}
wg.Wait()
if err != nil {
log.Info("event: %s, %s", result, err.Error())
return result, err
}
close(quit)
close(errc)
close(done)
return result, nil
default:
return "", nil
}
return "", nil
}
I understand, it's trying to sync the longTimeTask1()
and longTimeTask2(). But it's quite complex for me to understand. What's the purpose of count and count == 4? Why the close at the last? The wg.Wait()
is unreachable by the code hint.
Before this func is working well. but recently longTimeTask1()
or longTimeTask2()
might return some error, which breaks the code, this func seems is blocked totally. Could you please help me understand the code and find the potential issues and refactor this part?
CodePudding user response:
Looking at count
, it appears like the code is expecting to receive four messages from the done
channel. However, this code can produce at most two such messages from the two goroutines, so that's a bug.
Also, if any of the goroutines returns an error, it will not write to the done
channel, so that's another bug.
Another way to write this could be:
...
result := "processed"
for {
select {
case err := <-errc:
close(quit) // Tell the goroutines to terminate
log.Info("event: %s, %s", "", err.Error())
wg.Wait() // Wait for them to finish
return "", err
case <-done:
count
if count == 2 {
wg.Wait()
return result, nil
}
}
CodePudding user response:
This is exactly the sort of fork-and-join concurrency that the errgroup
package was designed for:
func processEvent(ctx context.Context, i models.Foo) (string, error) {
err := func1()
if err != nil {
return "", err
}
g, ctx := errgroup.WithContext(ctx)
if strings.ToUpper(i.Status) != "OK" {
return "", nil
}
g.Go(func() error { return longTimeTask1(ctx) })
g.Go(func() error { return longTimeTask2(ctx) })
if err := g.Wait(); err != nil {
log.Printf("event: %v", err)
return "", err
}
return "processed", nil
}