Home > other >  Concurrent handler is blocking
Concurrent handler is blocking

Time:09-24

We found one mqtt.MessageHandler is not working properly. In the handler, we will filter the coming message then pass the valid event to one func to process. The func is implemented as below:

func processEvent(i models.Foo) (string, error) {
    var wg sync.WaitGroup
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)

    err := func1()
    if err != nil {
        return err
    }

    switch strings.ToUpper(i.Status) {
    case "OK":
        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask1()
            ch := done
            if err != nil {
                log.Error("%s", err.Error())
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask2()
            ch := done
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        result := "processed"
        count := 0
        for {
            select {
            case err := <-errc:
                close(quit)
                log.Info("event: %s, %s", "", err.Error())
                return "", err
            case <-done:
                count  
                if count == 4 { // why 4???
                    return result, nil
                }
            }
        }

        wg.Wait()

        if err != nil {
            log.Info("event: %s, %s", result, err.Error())
            return result, err
        }
        close(quit)
        close(errc)
        close(done)
        return result, nil
    default:
        return "", nil
    }

    return "", nil
}

I understand, it's trying to sync the longTimeTask1() and longTimeTask2(). But it's quite complex for me to understand. What's the purpose of count and count == 4? Why the close at the last? The wg.Wait() is unreachable by the code hint. Before this func is working well. but recently longTimeTask1() or longTimeTask2() might return some error, which breaks the code, this func seems is blocked totally. Could you please help me understand the code and find the potential issues and refactor this part?

CodePudding user response:

Looking at count, it appears like the code is expecting to receive four messages from the done channel. However, this code can produce at most two such messages from the two goroutines, so that's a bug.

Also, if any of the goroutines returns an error, it will not write to the done channel, so that's another bug.

Another way to write this could be:

...
result := "processed"
for {
    select {
       case err := <-errc:
          close(quit) // Tell the goroutines to terminate
          log.Info("event: %s, %s", "", err.Error())
          wg.Wait() // Wait for them to finish
          return "", err
  
       case <-done:
          count  
          if count == 2 {
              wg.Wait()
              return result, nil
          }    
}

CodePudding user response:

This is exactly the sort of fork-and-join concurrency that the errgroup package was designed for:

func processEvent(ctx context.Context, i models.Foo) (string, error) {
    err := func1()
    if err != nil {
        return "", err
    }

    g, ctx := errgroup.WithContext(ctx)

    if strings.ToUpper(i.Status) != "OK" {
        return "", nil
    }

    g.Go(func() error { return longTimeTask1(ctx) })
    g.Go(func() error { return longTimeTask2(ctx) })

    if err := g.Wait(); err != nil {
        log.Printf("event: %v", err)
        return "", err
    }
    return "processed", nil
}

(https://play.golang.org/p/JNMKftQTLGs)

  • Related