This is the first time I'm using the concurrency features of Go and I'm jumping right into the deep end.
I want to make concurrent calls to an API. The request is based off of the tags of the posts I want to receive back (there can be 1 <= N tags). The response body looks like this:
{
"posts": [
{
"id": 1,
"author": "Name",
"authorId": 1,
"likes": num_likes,
"popularity": popularity_decimal,
"reads": num_reads,
"tags": [ "tag1", "tag2" ]
},
...
]
}
My plan is to daisy-chain a bunch of channels together and spawn a number of goroutines that read and or write from those channels:
- for each tag, add it to a tagsChannel inside a goroutine
- use that tagsChannel inside another goroutine to make concurrent GET requests to the endpoint
- for each response of that request, pass the underlying slice of posts into another goroutine
- for each individual post inside the slice of posts, add the post to a postChannel
- inside another goroutine, iterate over postChannel and insert each post into a data structure
Here's what I have so far:
func (srv *server) Get() {
// Using red-black tree prevents any duplicates, fast insertion
// and retrieval times, and is sorted already on ID.
rbt := tree.NewWithIntComparator()
// concurrent approach
tagChan := make(chan string) // tags -> tagChan
postChan := make(chan models.Post) // tagChan -> GET -> post -> postChan
errChan := make(chan error) // for synchronizing errors across goroutines
wg := &sync.WaitGroup{} // for synchronizing goroutines
wg.Add(4)
// create a go func to synchronize our wait groups
// once all goroutines are finished, we can close our errChan
go func() {
wg.Wait()
close(errChan)
}()
go insertTags(tags, tagChan, wg)
go fetch(postChan, tagChan, errChan, wg)
go addPostToTree(rbt, postChan, wg)
for err := range errChan {
if err != nil {
srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)
}
}
}
// insertTags inserts user's passed-in tags to tagChan
// so that tagChan may pass those along in fetch.
func insertTags(tags []string, tagChan chan<- string, group *sync.WaitGroup) {
defer group.Done()
for _, tag := range tags {
tagChan <- tag
}
close(tagChan)
}
// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
defer group.Done()
for tag := range tags {
ep, err := formURL(tag)
if err != nil {
errs <- err
}
group.Add(1) // QUESTION should I use a separate wait group here?
go func() {
resp, err := http.Get(ep.String())
if err != nil {
errs <- err
}
container := models.PostContainer{}
err = json.NewDecoder(resp.Body).Decode(&container)
defer resp.Body.Close()
group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
go insertPosts(posts, container.Posts, group)
defer group.Done()
}()
// group.Done() -- removed this call due to Burak, but now my program hands
}
}
// insertPosts inserts each individual post into our posts channel so that they may be
// concurrently added to our RBT.
func insertPosts(posts chan<- models.Post, container []models.Post, group *sync.WaitGroup) {
defer group.Done()
for _, post := range container {
posts <- post
}
}
// addPostToTree iterates over the channel and
// inserts each individual post into our RBT,
// setting the post ID as the node's key.
func addPostToTree(tree *tree.RBT, collection <-chan models.Post, group *sync.WaitGroup) {
defer group.Done()
for post := range collection {
// ignore return value & error here:
// we don't care about the returned key and
// error is only ever if a duplicate is attempted to be added -- we don't care
tree.Insert(post.ID, post)
}
}
I'm able to make one request to the endpoint, but as soon as try to submit a second request, my program fails with panic: sync: negative WaitGroup counter
.
My question is why exactly is my WaitGroup counter going negative? I make sure to add to the waitgroup and mark when my goroutines are done.
If the waitgroup is negative on the second request, then that must mean that the first time I allocate a waitgroup and add 4 to it is being skipped... why? Does this have something to do with closing channels, maybe? And if so, where do I close a channel?
Also -- does anyone have tips for debugging goroutines?
Thanks for your help.
CodePudding user response:
Firstly, the whole design is quite complicated. Mentioned my thoughts towards the end.
There are 2 problems in your code:
posts
channel is never closed, due to whichaddPostToTree
might never be existing the loop, resulting in one waitGroup never decreasing (In your case the program hangs). There is a chance the program waits indefinitely with a deadlock (Thinking other goroutine will release it, but all goroutines are stuck).
Solution: You can close thepostChan
channel. But how? Its always recommended for the producer to always close the channel, but you've multiple producers. So the best option is, wait for all producers to finish and then close the channel. In order to wait for all producers to finish, you'll need to create another waitGroup and use that to track the child routines.
Code:
// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
postsWG := &sync.WaitGroup{}
for tag := range tags {
ep, err := formURL(tag)
if err != nil {
errs <- err
}
postsWG.Add(1) // QUESTION should I use a separate wait group here?
go func() {
resp, err := http.Get(ep.String())
if err != nil {
errs <- err
}
container := models.PostContainer{}
err = json.NewDecoder(resp.Body).Decode(&container)
defer resp.Body.Close()
go insertPosts(posts, container.Posts, postsWG)
}()
}
defer func() {
postsWG.Wait()
close(posts)
group.Done()
}()
}
- Now, we've another issue, the main waitGroup should be initialized with
3
instead of4
. This is because the main routine is only spinning up 3 more routineswg.Add(3)
, so it has to keep track of only those. For child routines, we're using a different waitGroup, so that is not the headache of the parent anymore.
Code:
errChan := make(chan error) // for synchronizing errors across goroutines
wg := &sync.WaitGroup{} // for synchronizing goroutines
wg.Add(3)
// create a go func to synchronize our wait groups
// once all goroutines are finished, we can close our errChan
TLDR --
Complex Design - As the main wait group is started at one place, but each goroutine is modifying this waitGroup as it feels necessary. So there is no single owner for this, which makes debugging and maintaining super complex ( can't ensure it'll be bug free).
I would recommend breaking this down and have separate trackers for each child routines. That way, the caller who is spinning up more routines can only concentrate on tracking its child goroutines. This routine will then inform its parent waitGroup only after its done (& its child is done, rather than letting the child routinue informing the grandparent directly).
Also, in fetch
method after making the HTTP call and getting the response, why create another goroutine to process this data? Either way this goroutine cannot exit until the data insertion happens, nor is it doing someother action which data processing happens. From what I understand, The second goroutine is redundant.
group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
go insertPosts(posts, container.Posts, group)
defer group.Done()