Home > Net >  How do I terminate an infinite loop from inside of a goroutine?
How do I terminate an infinite loop from inside of a goroutine?

Time:08-13

I'm writing an app using Go that is interacting with Spotify's API and I find myself needing to use an infinite for loop to call an endpoint until the length of the returned slice is less than the limit, signalling that I've reached the end of the available entries.

For my user account, there are 1644 saved albums (I determined this by looping through without using goroutines). However, when I add goroutines in, I'm getting back 2544 saved albums with duplicates. I'm also using the semaphore pattern to limit the number of goroutines so that I don't exceed the rate limit.

I assume that the issue is with using the active variable rather than channels, but my attempt at that just resulted in an infinite loop

wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
    sem <- true
    if active {
        // add each new goroutine to waitgroup
        wg.Add(1)
        go func() error {
            // remove from waitgroup when goroutine is complete
            defer wg.Done()
            // release the worker
            defer func() { <-sem }()
            savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
            if err != nil {
                return err
            }
            userAlbums = append(userAlbums, savedAlbums.Albums...)
            if len(savedAlbums.Albums) < 50 {
                // since the limit is set to 50, we know that if the number of returned albums
                // is less than 50 that we're done retrieving data
                active = false
                return nil
            } else {
                offset  = 50
                return nil
            }
        }()
    } else {
        wg.Wait()
        break
    }
}

Thanks in advance!

CodePudding user response:

  1. Set offset as an args -> go func(offset int) error {.
  2. Increment offset by 50 after calling go func
  3. Change active type to chan bool

this is the example : https://go.dev/play/p/RhZMDYmsVD3

if applied to your code :

wg := &sync.WaitGroup{}
worker := 20
active := make(chan bool, worker)

for i := 0; i < worker; i   {
    active <- true
}
offset := 0
for {
    if <-active {
        // add each new goroutine to waitgroup
        wg.Add(1)
        go func(offset int) error {
            // remove from waitgroup when goroutine is complete
            defer wg.Done()
            savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
            if err != nil {
                // active <- false // maybe you need this
                return err
            }
            userAlbums = append(userAlbums, savedAlbums.Albums...)
            if len(savedAlbums.Albums) < 50 {
                // since the limit is set to 50, we know that if the number of returned albums
                // is less than 50 that we're done retrieving data
                active <- false
                return nil
            } else {
                active <- true
                return nil
            }
        }(offset)
        offset  = 50
    } else {
        wg.Wait()
        break
    }
}

CodePudding user response:

I suspect that your main issue may be a misunderstanding of what the go keyword does; from the docs:

A "go" statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same address space.

So go func() error { starts the execution of the closure; it does not mean that any of the code runs immediately. In fact because, client.CurrentUsersAlbums will take a while, it's likely you will be requesting the first 50 items 20 times. This can be demonstrated with a simplified version of your application (playground)

func main() {
    wg := &sync.WaitGroup{}
    sem := make(chan bool, 20)
    active := true
    offset := 0
    for {
        sem <- true
        if active {
            // add each new goroutine to waitgroup
            wg.Add(1)
            go func() error {
                // remove from waitgroup when goroutine is complete
                defer wg.Done()
                // release the worker
                defer func() { <-sem }()
                fmt.Println("Getting from:", offset)
                time.Sleep(time.Millisecond) // Simulate the query
                // Pretend that we got back 50 albums
                offset  = 50
                if offset > 2000 {
                    active = false
                }
                return nil
            }()
        } else {
            wg.Wait()
            break
        }
    }
}

Running this will produce somewhat unpredictable results (note that the playground caches results so try it on your machine) but you will probably see 20 X Getting from: 0.

A further issue is data races. Updating a variable from multiple goroutines without protection (e.g. sync.Mutex) results in undefined behaviour.

You will want to know how to fix this but unfortunately you will need to rethink your algorithm. Currently the process you are following is:

  1. Set pos to 0
  2. Get 50 records starting from pos
  3. If we got 50 records then pos=pos 50 and loop back to step 2

This is a sequential algorithm; you don't know whether you have all of the data until you have requested the previous section. I guess you could make speculative queries (and handle failures) but a better solution would be to find some way to determine the number of results expected and then split the queries to get that number of records between multiple goroutines.

  • Related