When I ran the program below, I got an error
davecheney tweets about golang
beertocode does not tweet about golang
ironzeb tweets about golang
beertocode tweets about golang
vampirewalk666 tweets about golang
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000010260?)
/usr/local/go/src/runtime/sema.go:56 0x25
sync.(*WaitGroup).Wait(0x100c000058058?)
/usr/local/go/src/sync/waitgroup.go:136 0x52
main.main()
/home/joe/go/src/github.com/go-concurrency-exercises/1-producer-consumer/main.go:53 0x14f
Where is the deadlock coming from and how to improve the program to avoid that?
package main
import (
"fmt"
"sync"
"time"
)
func producer(stream Stream, tweetChan chan *Tweet) {
for {
tweet, err := stream.Next()
if err == ErrEOF {
close(tweetChan)
return
}
tweetChan <- tweet
//tweets = append(tweets, tweet)
}
}
func consumer(tweetChan chan *Tweet) {
for t := range tweetChan {
if t.IsTalkingAboutGo() {
fmt.Println(t.Username, "\ttweets about golang")
} else {
fmt.Println(t.Username, "\tdoes not tweet about golang")
}
}
}
func main() {
start := time.Now()
stream := GetMockStream()
var wg sync.WaitGroup
tweetChan := make(chan *Tweet)
// Producer
//tweets := producer(stream)
wg.Add(2)
go producer(stream, tweetChan)
// Consumer
//consumer(tweets)
go consumer(tweetChan)
wg.Wait()
fmt.Printf("Process took %s\n", time.Since(start))
}
If you need to see mockstream.go, refer to https://github.com/loong/go-concurrency-exercises/tree/master/1-producer-consumer
My program is a concurrent version of the original program by modifying main.go
CodePudding user response:
The call to wg.Wait() is waiting until the group's counter is zero, but there are no running goroutines to decrement the counter.
Fix by calling wg.Done() before returning from the goroutine functions:
func producer(wg *sync.WaitGroup, stream Stream, tweetChan chan *Tweet) {
defer wg.Done()
for {
tweet, err := stream.Next()
if err == ErrEOF {
close(tweetChan)
return
}
tweetChan <- tweet
}
}
func consumer(wg *sync.WaitGroup, tweetChan chan *Tweet) {
defer wg.Done()
for t := range tweetChan {
if t.IsTalkingAboutGo() {
fmt.Println(t.Username, "\ttweets about golang")
} else {
fmt.Println(t.Username, "\tdoes not tweet about golang")
}
}
}
func main() {
start := time.Now()
stream := GetMockStream()
var wg sync.WaitGroup
tweetChan := make(chan *Tweet)
wg.Add(2)
go producer(&wg, stream, tweetChan)
go consumer(&wg, tweetChan)
wg.Wait()
fmt.Printf("Process took %s\n", time.Since(start))
}