Home > other >  Go Accept from the fastest worker function
Go Accept from the fastest worker function

Time:08-16

Please consider this question.

Currently I've one method that completes a job:

for {
  var result string 
  var resultOK bool
  result, resultOK = worker1(job) 
  if resultOK {
     // go on to other things
  }
  ...
}

Suppose we now have another mechanism that may be able to finish this job faster - let's call it worker2. Note that in some cases worker1 may be faster.

What's the idiomatic way to launch these two workers and accept the result from the one which finishes first successfully?

I know of select mechanism, but it does not care for resultOK bool.

select {
case <- c1:
  // worker1 finished
case <- c2:
  // worker2 finished
case <- time.After(10 * time.Second):
  // we need to move on
}

Any advise would be much appreciated!

CodePudding user response:

Usually this is solved by making each worker deliver the result on the same channel, and receive from the channel (once). Whoever is faster, its result will be used. It's also a good idea to signal other, slow workers this by e.g. using a context.Context (that their work is no longer needed, and they should abort as soon as possible).

If you only receive once from the channel, care must be taken not to block the slow workers in case they end up finishing their work and trying to send the result: they may get blocked forever. So the channel should have sufficiently large buffer to not block the workers, or the workers should send using a select statement having a default branch to not get blocked in case the send operation cannot be executed.

If the result produced by workers is not acceptable (e.g. an error occurred), the workers may of course decide not to send any results. Care must be taken again of course, because if all workers fail, no result would be delivered, so the receiving party could wait forever. This can be avoided by using a timeout, or making the workers send a result that indicates failure (which the receiver has to process, and in which case have to keep receiving until a good result is received or no more results are coming).

CodePudding user response:

I know of select mechanism, but it does not care for resultOK bool.

You can create a custom type struct which combines result and resultOK and use this type for the channel.

CodePudding user response:

An example using code:

func main() {
    result, err := RunConcurrent(1000, TimeConsumingWork)
    fmt.Println("result =", result, "error =", err)
}

type workFn func(ctx context.Context, input any) (any, error)

// RunConcurrent executes the workFn on `n` amount of goroutines
func RunConcurrent(n int, fn workFn) (any, error) {

    wg := sync.WaitGroup{}
    wg.Add(n)

    resultChan := make(chan any, n) // there are potentially `n` results

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // this will signal other routines to stop

    for i := 0; i < n; i   {
        go func(v any) { // run function as in new routine
            defer wg.Done() // signal this routine is done

            result, err := fn(ctx, v)
            if err != nil {
                return
            }

            select { // try to store result on result channel. If ctx.Done, stop trying.
            case resultChan <- result:
            case <-ctx.Done():
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(resultChan) // DO NOT CLOSE before ALL routines are done. This is an easy way to create a deadlock...
    }()

    v, ok := <-resultChan // check if result is valid and not the result of closed channel
    if !ok {
        return nil, errors.New("no results")
    }
    return v, nil
}

func TimeConsumingWork(ctx context.Context, input any) (any, error) {
    simulatedExecutionTime := time.Duration(10 rand.Intn(50)) * time.Millisecond

    select { // do time consuming work, until context is done
    case <-time.After(simulatedExecutionTime):
        if rand.Intn(2) == 0 {
            return nil, errors.New("simulated a random error")
        }
        return input, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}
  • Related