Home > database >  Gracefully closing channel and not sending on closed channel
Gracefully closing channel and not sending on closed channel

Time:11-18

I am new to Golang concurrency and have been working to understand this piece of code mentioned below.

I witness few things which I am unable to explain why it happens:

  1. when using i smaller than equal to 100000 for i <= 100000 { in main function, it sometimes prints different values for nResults and countWrites (in last two statements) fmt.Printf("number of result writes %d\n", nResults) fmt.Printf("Number of job writes %d\n", jobWrites)

  2. when using i more than 1000000 it gives panic: send on closed channel

How can I make sure that the values send to jobs is not on closed channel and later after all values are received in results we can close the channel without deadlock?

package main

import (
    "fmt"
    "sync"
)

func worker(wg *sync.WaitGroup, id int, jobs <-chan int, results chan<- int, countWrites *int64) {
    defer wg.Done()
    for j := range jobs {
        *countWrites  = 1
        go func(j int) {
            if j%2 == 0 {
                results <- j * 2
            } else {
                results <- j
            }
        }(j)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    jobs := make(chan int)
    results := make(chan int)
    var i int = 1
    var jobWrites int64 = 0
    for i <= 10000000 {
        go func(j int) {
            if j%2 == 0 {
                i  = 99
                j  = 99
            }
            jobWrites  = 1
            jobs <- j
        }(i)
        i  = 1
    }

    var nResults int64 = 0
    for w := 1; w < 1000; w   {
        wg.Add(1)
        go worker(wg, w, jobs, results, &nResults)
    }

    close(jobs)
    wg.Wait()

    var sum int32 = 0
    var count int64 = 0
    for r := range results {
        count  = 1
        sum  = int32(r)
        if count == nResults {
            close(results)
        }
    }
    fmt.Println(sum)
    fmt.Printf("number of result writes %d\n", nResults)
    fmt.Printf("Number of job writes %d\n", jobWrites)
}

CodePudding user response:

Quite a few problems in your code.

Sending on closed channel

One general principle of using Go channels is

don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders

(https://go101.org/article/channel-closing.html)

The solution for you is simple: don't have multiple concurrent senders, and then you can close the channel from the sender side.

Instead of starting millions of separate goroutine for each job you add to the channel, run one goroutine that executes the whole loop to add all jobs to the channel. And close the channel after the loop. The workers will consume the channel as fast as they can.

Data races by modifying shared variables in multiple goroutines

You're modifying two shared variables without taking special steps:

  1. nResults, which you pass to the countWrites *int64 in the worker.
  2. i in the loop that writes to the jobs channel: you're adding 99 to it from multiple goroutines, making it unpredictable how many values you actually write to the jobs channel

To solve 1, there are many options, including using sync.Mutex. However since you're just adding to it, the easiest solution is to use atomic.AddInt64(countWrites, 1) instead of *countWrites = 1

To solve 2, don't use one goroutine per write to the channel, but one goroutine for the entire loop (see above)

  • Related