I'm trying to implement the bridge pattern following Go Concurrency book
func bridge_impl() {
done := make(chan interface{})
defer close(done)
var wg sync.WaitGroup
bridge := func(
done <-chan interface{},
chanStream <-chan <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{})
go func() {
wg.Add(1)
defer close(valStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
fmt.Println("works")
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range stream {
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
genVals := func() <-chan <-chan interface{} {
chanStream := make(chan (<-chan interface{}))
go func() {
wg.Add(1)
defer close(chanStream)
for i := 0; i < 10; i {
stream := make(chan interface{})
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
for v := range bridge(done, genVals()) {
fmt.Printf("%v ", v)
}
wg.Wait()
}
However I'm receiving a deadlock errorall goroutines are asleep - deadlock!
at first I thought I should add a waitgroup even though it wasn't implemented in the book example but I ended up with the same error
CodePudding user response:
From what I understand you do not need a WaitGroup at all, you just need to re-order the statements in the genVals
function's loop:
for i := 0; i < 10; i {
stream := make(chan interface{})
chanStream <- stream
stream <- i
close(stream)
}
https://go.dev/play/p/7D9OzrsvZyi
CodePudding user response:
There are two main issues.
Working example
First issue:
for i := 0; i < 10; i {
stream := make(chan interface{})
stream <- i
close(stream)
chanStream <- stream
}
writing to unbuffered channel after creation with no goroutine reading. Use buffered channel or another goroutine.
stream := make(chan interface{}, 1) // buffer size 1 to not block `stream <- i`
Second issue:
Using wg.Add(1)
without wg.Done()
.
You can use defer
in both cases.
wg.Add(1)
defer wg.Done()