Please consider this question.
Currently I've one method that completes a job:
for {
var result string
var resultOK bool
result, resultOK = worker1(job)
if resultOK {
// go on to other things
}
...
}
Suppose we now have another mechanism that may be able to finish this job faster - let's call it worker2
. Note that in some cases worker1
may be faster.
What's the idiomatic way to launch these two workers and accept the result from the one which finishes first successfully?
I know of select
mechanism, but it does not care for resultOK
bool.
select {
case <- c1:
// worker1 finished
case <- c2:
// worker2 finished
case <- time.After(10 * time.Second):
// we need to move on
}
Any advise would be much appreciated!
CodePudding user response:
Usually this is solved by making each worker deliver the result on the same channel, and receive from the channel (once). Whoever is faster, its result will be used. It's also a good idea to signal other, slow workers this by e.g. using a context.Context
(that their work is no longer needed, and they should abort as soon as possible).
If you only receive once from the channel, care must be taken not to block the slow workers in case they end up finishing their work and trying to send the result: they may get blocked forever. So the channel should have sufficiently large buffer to not block the workers, or the workers should send using a select
statement having a default
branch to not get blocked in case the send operation cannot be executed.
If the result produced by workers is not acceptable (e.g. an error occurred), the workers may of course decide not to send any results. Care must be taken again of course, because if all workers fail, no result would be delivered, so the receiving party could wait forever. This can be avoided by using a timeout, or making the workers send a result that indicates failure (which the receiver has to process, and in which case have to keep receiving until a good result is received or no more results are coming).
CodePudding user response:
I know of select mechanism, but it does not care for resultOK bool.
You can create a custom type struct which combines result and resultOK and use this type for the channel.
CodePudding user response:
An example using code:
func main() {
result, err := RunConcurrent(1000, TimeConsumingWork)
fmt.Println("result =", result, "error =", err)
}
type workFn func(ctx context.Context, input any) (any, error)
// RunConcurrent executes the workFn on `n` amount of goroutines
func RunConcurrent(n int, fn workFn) (any, error) {
wg := sync.WaitGroup{}
wg.Add(n)
resultChan := make(chan any, n) // there are potentially `n` results
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // this will signal other routines to stop
for i := 0; i < n; i {
go func(v any) { // run function as in new routine
defer wg.Done() // signal this routine is done
result, err := fn(ctx, v)
if err != nil {
return
}
select { // try to store result on result channel. If ctx.Done, stop trying.
case resultChan <- result:
case <-ctx.Done():
}
}(i)
}
go func() {
wg.Wait()
close(resultChan) // DO NOT CLOSE before ALL routines are done. This is an easy way to create a deadlock...
}()
v, ok := <-resultChan // check if result is valid and not the result of closed channel
if !ok {
return nil, errors.New("no results")
}
return v, nil
}
func TimeConsumingWork(ctx context.Context, input any) (any, error) {
simulatedExecutionTime := time.Duration(10 rand.Intn(50)) * time.Millisecond
select { // do time consuming work, until context is done
case <-time.After(simulatedExecutionTime):
if rand.Intn(2) == 0 {
return nil, errors.New("simulated a random error")
}
return input, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}