I'm learning Go concurrency and wrote the obligatory worker pool example, where there are N pieces of work and M workers (N > M). I'm running into a deadlock (all goroutines are asleep
), which I can't figure out; however, I'm also getting the expected output before the deadlock occurs, which has me even more confused. Can someone please point out the things I'm doing wrong?
My code is this:
package main
import (
"fmt"
"sync"
)
// A simple worker pool implementation using channels and WaitGroups.
// Our workers simply read from a channel of integers from an input
// channel and write their squares to an output channel.
func addJobs(jobsCh chan<- int, wg *sync.WaitGroup) {
// 100 numbers to crunch (jobs)
for i := 1; i < 101; i {
jobsCh <- i
}
wg.Done()
}
func worker(jobsCh <-chan int, resultsCh chan<- int, wg2 *sync.WaitGroup) {
for num := range jobsCh {
resultsCh <- num * num
}
wg2.Done()
}
func addWorkers(jobsCh <-chan int, resultsCh chan<- int, wg *sync.WaitGroup) {
var wg2 sync.WaitGroup
// 10 workers
for i := 0; i < 10; i {
wg2.Add(1)
go worker(jobsCh, resultsCh, &wg2)
}
wg.Done()
}
func readResults(resultsCh <-chan int, wg *sync.WaitGroup) {
for sq := range resultsCh {
fmt.Printf("%v ", sq)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
jobsCh := make(chan int)
resultsCh := make(chan int)
wg.Add(1)
go addJobs(jobsCh, &wg)
wg.Add(1)
go addWorkers(jobsCh, resultsCh, &wg)
wg.Add(1)
go readResults(resultsCh, &wg)
wg.Wait()
}
This prints the squares of the numbers (in random order), as expected, but also runs into a deadlock. Please see this playground link. :(
CodePudding user response:
Close jobsCh
to cause workers to exit:
func addJobs(jobsCh chan<- int, wg *sync.WaitGroup) {
// 100 numbers to crunch (jobs)
for i := 1; i < 101; i {
jobsCh <- i
}
close(jobsCh) // <-- add this line
wg.Done()
}
After workers are done, close resultsCh
to cause results loop to exit:
func addWorkers(jobsCh <-chan int, resultsCh chan<- int, wg *sync.WaitGroup) {
var wg2 sync.WaitGroup
// 10 workers
for i := 0; i < 10; i {
wg2.Add(1)
go worker(jobsCh, resultsCh, &wg2)
}
wg2.Wait() // <-- add this line
close(resultsCh) // and this line
wg.Done()
}