I want to make X number of goroutines to update CountValue
using parallelism (numRoutines are as much as how many CPU you have).
Solution 1:
func count(numRoutines int) (countValue int) {
var mu sync.Mutex
k := func(i int) {
mu.Lock()
defer mu.Unlock()
countValue = 5
}
for i := 0; i < numRoutines; i {
go k(i)
}
It becomes a data race and the returned countValue = 0
.
Solution 2:
func count(numRoutines int) (countValue int) {
k := func(i int, c chan int) {
c <- 5
}
c := make(chan int)
for i := 0; i < numRoutines; i {
go k(i, c)
}
for i := 0; i < numRoutines; i {
countValue = <- c
}
return
}
I did a benchmark test on it and doing a sequential addition would work faster than using goroutines. I think it's because I have two for loops here as when I put countValue = <- c
inside the first for loop, the code runs faster.
Solution 3:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i {
wg.Add(1)
go k(i)
}
go func() {
for i := range c {
countValue = i
}
}()
wg.Wait()
return
}
Still a race count :/
Is there any way better to do this?
CodePudding user response:
There definitely is a better way to safely increment a variable: using sync/atomic
:
import "sync/atomic"
var words int64
k := func() {
_ = atomic.AddInt64(&words, 5) // increment atomically
}
Using a channel basically eliminates the need for a mutex, or takes away the the risk of concurrent access to the variable itself, and a waitgroup here is just a bit overkill
Channels:
words := 0
done := make(chan struct{}) // or use context
ch := make(chan int, numRoutines) // buffer so each routine can write
go func () {
read := 0
for i := range ch {
words = 5 // or use i or something
read
if read == numRoutines {
break // we've received data from all routines
}
}
close(done) // indicate this routine has terminated
}()
for i := 0; i < numRoutines; i {
ch <- i // write whatever value needs to be used in the counting routine on the channel
}
<- done // wait for our routine that increments words to return
close(ch) // this channel is no longer needed
fmt.Printf("Counted %d\n", words)
As you can tell, the numRoutines
no longer is the number of routines, but rather the number of writes on the channel. You can move that to individual routines, still:
for i := 0; i < numRoutines; i {
go func(ch chan<- int, i int) {
// do stuff here
ch <- 5 * i // for example
}(ch, i)
}
WaitGroup:
Instead of using a context that you can cancel, or a channel, you can use a waitgroup atomic to get the same result. The easiest way IMO to do so is to create a type:
type counter struct {
words int64
}
func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
defer wg.Done()
_ = atomic.AddInt64(&c.words, i * 5) // whatever value you need to add
}
func main () {
cnt := counter{}
wg := sync.WaitGroup{}
wg.Add(numRoutines) // create the waitgroup
for i := 0; i < numRoutines; i {
go cnt.doStuff(&wg, i)
}
wg.Wait() // wait for all routines to finish
fmt.Println("Counted %d\n", cnt.words)
}
Fix for your third solution
As I mentioned in the comment: your third solution is still causing a race condition because the channel c
is never closed, meaning the routine:
go func () {
for i := range c {
countValue = i
}
}()
Never returns. The waitgroup also only ensures that you've sent all values on the channel, but not that the countValue
has been incremented to its final value. The fix would be to either close the channel after wg.Wait()
returns so the routine can return, and add a done
channel that you can close when this last routine returns, and add a <-done
statement before returning.
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i {
wg.Add(1)
go k(i)
}
done := make(chan struct{})
go func() {
for i := range c {
countValue = i
}
close(done)
}()
wg.Wait()
close(c)
<-done
return
}
This adds some clutter, though, and IMO is a bit messy. It might just be easier to just move the wg.Wait()
call to a routine:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
// add wg as argument, makes it easier to move this function outside of this scope
k := func(wg *sync.WaitGroup, i int) {
defer wg.Done()
c <- 5
}
wg.Add(numShards) // increment the waitgroup once
for i := 0; i < numShards; i {
go k(&wg, i)
}
go func() {
wg.Wait()
close(c) // this ends the loop over the channel
}()
// just iterate over the channel until it is closed
for i := range c {
countValue = i
}
// we've added all values to countValue
return
}