Home > Enterprise >  Creating cancelable workers with goroutines and context
Creating cancelable workers with goroutines and context

Time:04-12

I'm trying to understand how to properly use goroutines along with channels and context, to create a cancelable background worker.

I'm familiar with using contexts that can cancel when explicitly called, attaching it to the worker goroutine should let me stop the worker.

But I cant figure out how to use it to achieve what this.

The example below illustrates a worker goroutine that gets the data from a channel 'urls', and it also carries a cancelable context.

//worker.go
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d is starting\n", id)
    select {
    // placeholder for a channel writing the data from the URL
    case url := <-urls:
        fmt.Printf("Worker :%d received url :%s\n", id, url)
    // checking if the process is cancelled
    case <-ctx.Done():
        fmt.Printf("Worker :%d exitting..\n", id)
    }
    fmt.Printf("Worker :%d done..\n", id)
    wg.Done()
}

This doesn't work for me for two reasons,

  1. For an unbuffered channel, writing to it with no goroutines to read from will block it, so once more data is added to the the urls channel, the sender will block.
  2. It returns immediately, once either of the two channel returns.

I also tried wrapping the select in an infinite loop, but adding a break after the context is raising error.

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d is starting\n", id)
    for {
        select {
        // placeholder for a channel writing the data from the URL
        case url := <-urls:
            fmt.Printf("Worker :%d received url :%s\n", id, url)
        // checking if the process is cancelled
        case <-ctx.Done():
            fmt.Printf("Worker :%d exitting..\n", id)
            break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
        }
    }
    fmt.Printf("Worker :%d done..\n", id) // code is unreachable
    wg.Done()
}

What is the right approach to implement something like this?

PS : Any resources / references about designing worker processes like these will be helpful too.

CodePudding user response:

You can substitute the break by return and the code will work.

However, a better approach can be:

  1. Workers consume the channel in for / range loop
  2. The producer should be responsible for detect the cancel and close the channel. The for loop will stop in cascade

CodePudding user response:

I've made a Go package specifically for this. You can find it here: https://github.com/MicahParks/ctxerrpool

Here's the example from the project's README.md:

package main

import (
    "bytes"
    "context"
    "log"
    "net/http"
    "os"
    "time"

    "github.com/MicahParks/ctxerrpool"
)

func main() {

    // Create an error handler that logs all errors.
    var errorHandler ctxerrpool.ErrorHandler
    errorHandler = func(pool ctxerrpool.Pool, err error) {
        log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
    }

    // Create a worker pool with 4 workers.
    pool := ctxerrpool.New(4, errorHandler)

    // Create some variables to inherit through a closure.
    httpClient := &http.Client{}
    u := "https://golang.org"
    logger := log.New(os.Stdout, "status codes: ", 0)

    // Create the worker function.
    var work ctxerrpool.Work
    work = func(ctx context.Context) (err error) {

        // Create the HTTP request.
        var req *http.Request
        if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
            return err
        }

        // Do the HTTP request.
        var resp *http.Response
        if resp, err = httpClient.Do(req); err != nil {
            return err
        }

        // Log the status code.
        logger.Println(resp.StatusCode)

        return nil
    }

    // Do the work 16 times.
    for i := 0; i < 16; i   {

        // Create a context for the work.
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()

        // Send the work to the pool.
        pool.AddWorkItem(ctx, work)
    }

    // Wait for the pool to finish.
    pool.Wait()
}
  •  Tags:  
  • go
  • Related