I have the following code: i have a list to go through and do something with a value from that list, and so i thought of using go routines, but i need to use a max number of go routines, and then in go routine i need to make a call that will get a return of response, err, when the err is different from null I need to terminate all the go routines and return an http response, and if there is no err I need to terminate the go routines and return an http response,
When I have few values it works ok, but when I have many values I have a problem, because when I call cancel I will still have go routines trying to send to the response channel that is already closed and I keep getting errors from:
goroutine 36 [chan send]:
type response struct {
value string
}
func Testing() []response {
fakeValues := getFakeValues()
maxParallel := 25
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(fakeValues) < maxParallel {
maxParallel = len(fakeValues)
}
type responseChannel struct {
Response response
Err error
}
reqChan := make(chan string, 1)
resChan := make(chan responseChannel)
wg := &sync.WaitGroup{}
wg.Add(maxParallel)
for i := 0; i < maxParallel; i {
go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
for val := range ch {
resp, err := getFakeResult(val)
resChan <- responseChannel{
Response: resp,
Err: err,
}
}
wg.Done()
}(ctx, reqChan, resChan)
}
for _, body := range fakeValues {
reqChan <- body
}
close(reqChan)
go func() {
wg.Wait()
close(resChan)
}()
var hasErr error
response := make([]response, 0, len(fakeValues))
for res := range resChan {
if res.Err != nil {
hasErr = res.Err
cancel()
break
}
response = append(response, res.Response)
}
if hasErr != nil {
return responses.ErrorResponse(hasErr) // returns http response
}
return responses.Accepted(response, nil) // returns http response
}
func getFakeValues() []string {
return []string{"a", "b"}
}
func getFakeResult(val string) (response, error) {
if val == "a" {
return response{}, fmt.Errorf("ooh noh:%s", val)
}
return response{
value: val,
}, nil
}
CodePudding user response:
The workers end up blocked on sending to resChan because it's not buffered, and after an error, nothing reads from it.
You can either make resChan buffered, with a size at least as large as maxParallel. Or check to see if the context was canceled, e.g. change the resChan <-
to
select {
case resChan <- responseChannel{
Response: resp,
Err: err,
}:
case <-ctx.Done():
}
CodePudding user response:
There are two main problems with your solution:
First, if your fakeValues
slice has more items than maxParallel
1, your program will block on this part:
for _, body := range fakeValues {
reqChan <- body
}
How does this happen? As you start putting values in reqChan
, each started goroutine will read one value from the reqChan
and try to write the response to resChan
. But, since resChan
is still not reading responses, each goroutine will block there (writing to resChan
). Eventually, once each goroutine is blocked, reading from the reqChan
is blocked as well and you cannot put any more values in it (apart from one buffered value).
Second, you are passing the context to your goroutines, but you are not doing anything with it. You can use ctx.Done()
channel to get a signal to exit the goroutine. Something like this:
go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
for {
select {
case val := <-ch:
resp, err := getFakeResult(val)
resChan <- responseChannel{
Response: resp,
Err: err,
}
case <- ctx.Done():
return
}
}
}(ctx, reqChan, resChan)
Now, to tie everything together so that there are no deadlocks, no race conditions, and no situations where values are not processed, a few other changes need to be made. I've posted the entire code below.
func Testing() []response {
fakeValues := getFakeValues()
maxParallel := 25
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(fakeValues) < maxParallel {
maxParallel = len(fakeValues)
}
type responseChannel struct {
Response response
Err error
}
reqChan := make(chan string) //make this an unbuffered channel
resChan := make(chan responseChannel)
wg := &sync.WaitGroup{}
wg.Add(maxParallel)
for i := 0; i < maxParallel; i {
go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
for {
select {
case val := <-ch:
resp, err := getFakeResult(val)
resChan <- responseChannel{
Response: resp,
Err: err,
}
case <- ctx.Done():
wg.Done()
return
}
}
wg.Done()
}(ctx, reqChan, resChan)
}
go func() {
for _, body := range fakeValues {
reqChan <- body
}
close(reqChan)
//putting cancel here so that it can terminate all goroutines when all values are read from reqChan
cancel()
}()
go func() {
wg.Wait()
close(resChan)
}()
var hasErr error
response := make([]response, 0, len(fakeValues))
for res := range resChan {
if res.Err != nil {
hasErr = res.Err
cancel()
break
}
response = append(response, res.Response)
}
if hasErr != nil {
return responses.ErrorResponse(hasErr) // returns http response
}
return responses.Accepted(response, nil) // returns http response
}
In short, the changes are:
reqChan
is an unbuffered channel, as this will help in cases where values might not get processed when we close goroutines that read data from buffered channels.- worker goroutines have been changed to accommodate the cases of both exiting when error happens and when there is no more data from
reqChan
to process.wg.Done()
is executed when the context is canceled to ensure thatresChan
is eventually closed. - separate goroutine is created to put the data in the
reqChan
without blocking the program, close it afterward, and cancel the context.