Home > Software engineering >  Why is my function not waiting for the goroutines to complete?
Why is my function not waiting for the goroutines to complete?

Time:09-27

I have a function that makes a get request and then store both the response and the encoded response in a struct. It takes in a pointer to a wait group

Here is that function

type EncodedData string

type EncodedImage struct {
    Data        []byte
    EncodedData EncodedData
    Error       error
}

func GetPainting(url string, EI *EncodedImage, wg *sync.WaitGroup) {
    defer wg.Done()

    res, err := http.Get(url)
    if err != nil {
        EI.Error = errors.Wrapf(err, "unable to fetch from provided url %s", url)
    }

    body, err := ioutil.ReadAll(res.Body)
    if err != nil {
        EI.Error = err
    }

    encoded := b64.StdEncoding.EncodeToString(body)
    EI.Data, EI.EncodedData = body, EncodedData(encoded)
}

Here is the function that calls the previous function. It's a handler for a gin router.

func Search(db *gorm.DB) gin.HandlerFunc {
    return func(c *gin.Context) {
        
        // this is just receiving a search term, making a query, and then loading it into "results". 
        term := c.Param("term")
        var results []models.Searches
        db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)

        
        var wg sync.WaitGroup

        // results is an slice of structs
        for i, re := range results {

            var ed EncodedImage
            wg.Add(1)

            // here is the function defined above
            go GetPainting(re.IMG, &ed, &wg)
            if ed.Error != nil {
                c.JSON(http.StatusInternalServerError, ed.Error.Error())
                panic(ed.Error)
            }

            results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
        }

        wg.Wait()
        c.JSON(http.StatusOK, results)
}

The JSON response shows "data:image/jpeg;base64," which means the goroutines aren't being waited on to completion

This all works without using additional goroutines. In other words, things stopped working when I introduced the go keyword. I wanted to try this to speed things up. Any insight or advice is greatly appreciated!

CodePudding user response:

The issue is here:

go GetPainting(re.IMG, &ed, &wg) // goroutine alters ed
...
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)

"A go statement starts the execution of a function call as an independent concurrent thread of control..." (source); you should not make assumptions as to when the goroutine will perform any action. So what might (I have not looked exactly how goroutines are currently managed) happen is something like:

  1. go GetPainting(re.IMG, &ed, &wg) - runtime schedules GetPainting to run.
  2. results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData) runs (ed.EndodedData is still nil).
  3. GetPainting runs.

You have created a data race; that is you have one goroutine writing to ed.EncodedData and another reading from it without synchronisation. Generally it's difficult to predict what will happen when there is a race; but in this case your goroutine is performing IO (http.Get) so it's very probable that the write will occur after the read.

To help explain this (and potential solutions) lets simplify your example (playground):

func routine(wg *sync.WaitGroup, val *int) {
    defer wg.Done()
    time.Sleep(time.Microsecond)
    *val = rand.Int()
}

func main() {
    const iterations = 5
    var wg sync.WaitGroup
    wg.Add(iterations)
    r := make([]int, iterations)
    results := make([]string, iterations)
    for i := 0; i < 5; i   {
        go routine(&wg, &r[i])
        results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
    }
    wg.Wait()
    for i := 0; i < 5; i   {
        fmt.Println(r[i], results[i])
    }
}

As you will see after the WaitGroup is done r (similar to your ed) is populated but results contains all 0 values. This points towards a simple solution (playground):

for i := 0; i < 5; i   {
    go routine(&wg, &r[i])
}
wg.Wait()
results := make([]string, iterations)
for i := 0; i < 5; i   {
    results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
for i := 0; i < 5; i   {
    fmt.Println(r[i], results[i])
}

This works because you are not accessing anything that the goroutines write to before you know that they are finished (via the WaitGroup). It's fairly simple to transfer this method into your code (create a slice of utils.EncodedImage and check for errors/results after the wg.Wait()).

While the above works it will never complete before all goroutines complete. Often that is not desirable, for instance, if receiving one error is fatal then you probably want to return a response to the user (and stop any ongoing work) as soon as the error is received.

There are a range of ways of dealing with this. Passing functions a Context is a very common means of enabling you to signal when they should stop (for your use-case see NewRequestWithContext). When it comes to handling the responses you can code this yourself (but it is easy to leak goroutines) or use something like golang.org/x/sync/errgroup. Here is an example (playground):

func routine(ctx context.Context, val *int) error {
    select {
    case <-time.After(time.Microsecond * time.Duration(rand.Intn(20))): // select will exit after a number of Milliseconds
    case <-ctx.Done(): // unless this is met (operation cancelled)
        fmt.Println("GoRoutine ending due to context")
        return ctx.Err()
    }
    *val = rand.Int()
    fmt.Println("generated ", *val)
    if simulateErrors && *val > (math.MaxInt/2) {
        return errors.New("Number too big")
    }
    return nil
}

func main() {
    const iterations = 5
    // In your case source context should probably come from gin.Context so the operation is cancelled if the connection drops
    g, ctx := errgroup.WithContext(context.Background())
    r := make([]int, iterations)
    for i := 0; i < iterations; i   {
        x := &r[i]
        g.Go(func() error {
            return routine(ctx, x)
        })
    }
    if err := g.Wait(); err != nil {
        fmt.Println("Got an error!", err)
        return // Here you send error as response (you might want to send something generic to avoid leaking system detail)
    }
    // Everything has processed OK
    results := make([]string, iterations)
    for i := 0; i < iterations; i   {
        results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
        fmt.Println(r[i], results[i])
    }
}

Note: Be careful using panic in production code. In your example you are doing this when an HTTP Get fails; this is something that is likely to happen at some point and you don't really want your application to shutdown if it does (return a sensible error to the end user and perhaps log the failure). It is possible to catch panics but its generally best to deal with errors as they are detected.

CodePudding user response:

@Brits had the right answer in the comments above. Setting results[i].IMG within the goroutine was the right solution. I also added the error handling he suggested. Here is the updated code below for anyone who needs it:

Note: I made GetPainting a method of EncodedImage for readability when it's called. It returns an error for errgroup.Group.Go()

func (EI *EncodedImage) GetPainting(url string, wg *sync.WaitGroup, result *models.Searches) error {
    defer wg.Done()

    res, err := http.Get(url)
    if err != nil {
        return err
    }

    body, err := ioutil.ReadAll(res.Body)
    if err != nil {
        return err
    }

    encoded := b64.StdEncoding.EncodeToString(body)
    EI.Data, EI.EncodedData = body, EncodedData(encoded)

    result.IMG = fmt.Sprintf("data:image/jpeg;base64,%v", EI.EncodedData)
    
    return nil
}
func Search(db *gorm.DB) gin.HandlerFunc {
    return func(c *gin.Context) {
        term := c.Param("term")
        var results []models.Searches
        db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)

        var wg sync.WaitGroup
        var g errgroup.Group

        for i, re := range results {

            var ed utils.EncodedImage
            wg.Add(1)

            g.Go(ed.GetPainting(re.IMG, &wg, &results[i]))
            if err := g.Wait(); err != nil {
                c.JSON(http.StatusInternalServerError, err.Error())
                panic(err)
            }
        }

        g.Wait()
        c.JSON(http.StatusOK, results)
    }
}
  • Related