I am new to Golang concurrency and have been working to understand this piece of code mentioned below.
I witness few things which I am unable to explain why it happens:
when using i smaller than equal to 100000
for i <= 100000 {
in main function, it sometimes prints different values for nResults and countWrites (in last two statements)fmt.Printf("number of result writes %d\n", nResults) fmt.Printf("Number of job writes %d\n", jobWrites)
when using i more than 1000000 it gives
panic: send on closed channel
How can I make sure that the values send to jobs is not on closed channel and later after all values are received in results we can close the channel without deadlock?
package main
import (
"fmt"
"sync"
)
func worker(wg *sync.WaitGroup, id int, jobs <-chan int, results chan<- int, countWrites *int64) {
defer wg.Done()
for j := range jobs {
*countWrites = 1
go func(j int) {
if j%2 == 0 {
results <- j * 2
} else {
results <- j
}
}(j)
}
}
func main() {
wg := &sync.WaitGroup{}
jobs := make(chan int)
results := make(chan int)
var i int = 1
var jobWrites int64 = 0
for i <= 10000000 {
go func(j int) {
if j%2 == 0 {
i = 99
j = 99
}
jobWrites = 1
jobs <- j
}(i)
i = 1
}
var nResults int64 = 0
for w := 1; w < 1000; w {
wg.Add(1)
go worker(wg, w, jobs, results, &nResults)
}
close(jobs)
wg.Wait()
var sum int32 = 0
var count int64 = 0
for r := range results {
count = 1
sum = int32(r)
if count == nResults {
close(results)
}
}
fmt.Println(sum)
fmt.Printf("number of result writes %d\n", nResults)
fmt.Printf("Number of job writes %d\n", jobWrites)
}
CodePudding user response:
Quite a few problems in your code.
Sending on closed channel
One general principle of using Go channels is
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders
(https://go101.org/article/channel-closing.html)
The solution for you is simple: don't have multiple concurrent senders, and then you can close the channel from the sender side.
Instead of starting millions of separate goroutine for each job you add to the channel, run one goroutine that executes the whole loop to add all jobs to the channel. And close the channel after the loop. The workers will consume the channel as fast as they can.
Data races by modifying shared variables in multiple goroutines
You're modifying two shared variables without taking special steps:
nResults
, which you pass to thecountWrites *int64
in the worker.i
in the loop that writes to the jobs channel: you're adding 99 to it from multiple goroutines, making it unpredictable how many values you actually write to thejobs
channel
To solve 1, there are many options, including using sync.Mutex
. However since you're just adding to it, the easiest solution is to use atomic.AddInt64(countWrites, 1)
instead of *countWrites = 1
To solve 2, don't use one goroutine per write to the channel, but one goroutine for the entire loop (see above)