Can we have multiple go routines listening to multiple channels facing issuing in printing all the problems.
I am not able to print all the numbers how can I improved this piece of code
If possible can anyone provide some example as I am struggling with this example.
Is time.sleep needed after every go routine
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
var count string
func worker3(var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var3 {
count = ch " "
}
}
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
func main() {
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
var wg sync.WaitGroup
count = ""
for i := 0; i < 15; i {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
for i := 0; i < 15; i {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
for i := 0; i < 15; i {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
for i := 0; i <= 100000; i {
var1 <- strconv.Itoa(i)
}
time.Sleep(time.Second)
wg.Wait()
fmt.Println(count)
}
CodePudding user response:
Yes, it's complicated, But there are a couple of rules of thumb that should make things feel much more straightforward.
- prefer using formal arguments for the channels you pass to go-routines instead of accessing channels in global scope. You can get more compiler checking this way, and better modularity too.
- avoid both reading and writing on the same channel in a particular go-routine (including the 'main' one). Otherwise, deadlock is a much greater risk.
CodePudding user response:
Let's see what your program doing. You first initialized three buffered channels var1, var2, var3
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
Now you initialized one WaitGroup (wg)
var wg sync.WaitGroup
Now you defined variable count and that variable is empty string
count = ""
The next part is for a loop that goes from 0 to 15 and generates 15 worker1 go routines
for i := 0; i < 15; i {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
Each time you start one worker1 go routine and pass channels and pointer to waitgroup (wg) in worker1.
But what worker1 will do?
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
worker1 will wait for that in channel var1 takes data from that channel and pass it to channel var2.
This is fine. You definitely don’t need this time.Sleep(time.Second).
let's go next.
You now create a new loop that will generate another 15 go routines (worker2).
for i := 0; i < 15; i {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
worker2 will take everything from channel var2 and pass it to the channel var3
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
now you create another 15 go routines for worker3.
for i := 0; i < 15; i {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
and worker3 takes everything from channel var3 processing that data by appending it to count string
The last piece of code is the seeder of data to the channels. That loop goes from 0 - 100000 and for each number convert them in string and pass it to the channel var1
next program will waiting to all go routine finished and print result.
Ok, there are a few problems with this code.
- you definitely don’t need this time.Sleep(time.Second) before each go routine and also you don’t need time.Sleep before wg.Wait().
- Buffered channels are not needed for this type of workload. This is a simple pipeline, you can use unbuffered channels, and that channels will be used for synchronization between tasks.
When you change you code to use unbuffered channels and remove these time.Sleep you still have a problem. And problem is that go lang runtime show’s error:
fatal error: all goroutines are asleep - deadlock!
and terminate a code.
But why does this happen, we have sync.WaitGroup and everything looks fine. Let’s see a simpler program that has the same error.
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i {
var1 <- strconv.Itoa(i)
}
wg.Wait()
}
This code will also produce the same error as your code. This is because these channels never are closed and go routines (workers) will wait forever for new data in the channel. Go runtime detect that and kill the process.
To prevent this type of error we need to add some mechanism to tell gorutine that we are done and go routine can stop listening on that channel and correctly finished.
The easiest way to send that signal is to close the channel that is read by that goroutine. This is a code that fixed the problem.
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i {
var1 <- strconv.Itoa(i)
}
close(var1)
wg.Wait()
}
and this code won’t produce errors. This code would be correctly terminated.
But there is a trick. How you can do that in your code? there are 15 go routine that read from var1 channel, 15 goroutine that read from var2 channel, and 15 from var3 channel.
It’s hard to know when you can close which channel. We know for channel var1 that processes data first so we can close them when the producer finishes with insertions data in a synchronous channel. The reason for that is we can’t insert new data into the channel until previous data is read. So when producer inserts all data we know all data on channel var1 are processed so it’s safe to close the channel. But what with channels var2 and var3.
There are 15 different go routine that wait for channel var2 and 15 for var3, these means we need to find a way to close var2 when all processing on var2 is done (in all goroutines worker1 ), and the same for var3. That can be done by creating two additional goroutine
wg1 and wg2 and use that goroutine to spawn goroutine for worker1 and worker2, these goroutine will work as an orchestrator, inside of these functions we create new sync.Group only for worker1 and worker2 and these functions will know when all of these children goroutines are finished. So for wg1 when all these worker1 goroutines are finished we can safely close var2 channel. Same for wg2 and var3 channel.
these are wg1 and wg2 functions
// wg1
go func() {
log.Printf("Starting WG1 master go routine")
wg.Add(1)
var wg1 sync.WaitGroup
defer func() {
close(var2)
wg.Done()
}()
for i := 0; i < 15; i {
wg1.Add(1)
go worker1(var1, var2, &wg1)
}
wg1.Wait()
}()
// wg2
go func() {
log.Printf("Starting WG2 routine for second stage")
wg.Add(1)
defer func() {
close(var3)
wg.Done()
}()
var wg2 sync.WaitGroup
for i := 0; i < 15; i {
wg2.Add(1)
go worker2(var2, var3, &wg2)
}
wg2.Wait()
}()
Full working code you can find on