I have a function that I want to define a maximum number of go routines, I have a list and I go through this list and I send a message to the go routines through the channel, and in this go routine I will call a function that will either get an answer or an err, when it's not an err I want to save the return in a slice, and when it's an err I want to stop the go routines and make a call. but I'm not able to make it so that when I have an error, all the go routines end, and I need the value of err
type response struct {
value string
}
func Testing() []response {
fakeValues := getFakeValues()
maxParallel := 25
wg := &sync.WaitGroup{}
wg.Add(maxParallel)
if len(fakeValues) < maxParallel {
maxParallel = len(fakeValues)
}
errReceive := make(chan error, 1)
defer close(errReceive)
response := make([]response, 0)
valuesChan := make(chan string, 1)
for i := 0; i < maxParallel; i {
go func(valuesChan <-chan string, errReceive chan error) {
for value := range valuesChan {
resp, err := getFakeResult(value)
if err != nil {
errReceive <- err
}
response = append(response, resp)
}
wg.Done()
}(valuesChan, errReceive)
}
for _, val := range fakeValues {
valuesChan <- val
}
close(valuesChan)
wg.Wait()
err := <-errReceive
if err != nil {
// make any thing
}
return response
}
func getFakeValues() []string {
return []string{"a", "b"}
}
func getFakeResult(val string) (response, error) {
if val == "a" {
return response{}, fmt.Errorf("ooh noh:%s", val)
}
return response{
value: val,
}, nil
}
CodePudding user response:
You can use a context with cancel and use it to let the go routines know they should stop.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("context is done")
return
case <-time.After(time.Second):
fmt.Println("work")
}
}
}(ctx)
time.Sleep(time.Second * 5)
cancel()
wg.Wait()
https://go.dev/play/p/qe2oDppSnaF
Here is an example that showcases it better in the context of your use case.
type result struct {
err error
val int
}
rand.Seed(time.Now().UnixNano())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rchan := make(chan result, 5)
wg := &sync.WaitGroup{}
for i := 0; i < 5; i {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("context is done")
return
case <-time.After(time.Second):
n := rand.Intn(100)
if n > 90 {
rchan <- result{err: fmt.Errorf("error %d", n)}
} else {
rchan <- result{val: n}
}
}
}
}(ctx)
}
go func() {
wg.Wait()
close(rchan)
}()
for res := range rchan {
if res.err != nil {
fmt.Println(res.err)
cancel()
break
} else {
fmt.Println(res.val)
}
}