In the code hereunder, I don't understand why the "Worker" methods seem to exit instead of pulling values from the input channel "in" and processing them.
I had assumed they would only return after having consumed all input from the input channel "in" and processing them
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
i int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
for item := range in {
item *= item // returns the square of the input value
fmt.Printf("=> %d: %d\n", id, item)
out <- Result{item, id}
}
wg.Done()
fmt.Printf("%d exiting ", id)
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id {
fmt.Printf("Starting : %d\n", id)
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 4
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 100; i {
in <- i
}
close(in)
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out : %d: %d", item.i, item.val)
}
}
The output is
Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!
CodePudding user response:
fatal error: all goroutines are asleep - deadlock!
The full error shows where each goroutine is "stuck". If you run this in the playground, it will even show you the line number. That made it easy for me to diagnose.
Your Run_parallel
runs in the main
groutine, so before main
can read from out
, Run_parallel
must return. Before Run_parallel
can return, it must wg.Wait()
. But before the workers call wg.Done()
, they must write to out
. That's what causes a deadlock.
One solution is simple: just run Run_parallel
concurrently in its own Goroutine.
go Run_parallel(NW, in, out, Worker)
Now, main
ranges over out
, waiting on out
s closure to signal completion. Run_parallel
waits for the workers with wg.Wait()
, and the workers will range over in
. All the work will get done, and the program won't end until it's all done. (https://go.dev/play/p/oMrgH2U09tQ)
CodePudding user response:
Alternative formulation of the solution:
In that alternative formulation , it is not necessary to start Run_parallel as a goroutine (it triggers its own goroutine). I prefer that second solution, because it automates the fact that Run_parallel() has to run parallel to the main function. Also, for the same reason it's safer, less error-prone (no need to remember to run Run_parallel with the go keyword).
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
go func() {
wg := sync.WaitGroup{}
defer close(out) // close the output channel when all tasks are completed
for id := 0; id < n_workers; id {
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)
}()
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
defer close(in)
for i := 0; i < 10; i {
in <- i
}
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %d\n", item.id, item.val)
}
println("- - - All done - - -")
}
CodePudding user response:
Solution :
Run_parallel has to run in it’s own goroutine:
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id {
wg.Add(1)
go Worker(in, out, id, &wg)
}
wg.Wait() // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 10; i {
in <- i
}
close(in)
}()
go Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %d\n", item.id, item.val)
}
println("- - - All done - - -")
}