Home > Software engineering >  Track progress of long running tasks - correct approach
Track progress of long running tasks - correct approach

Time:06-27

I'd like to track execution of some long running process and show the user completion percentage and errors (if any). If it's one long running process, then it's easy - you can create channels for progress (percentage) and error. What would the correct way to implement such logic when we have X long running processes?

Below is a snippet of code that works, but I don't really like how it's implemented. I created a struct ProgressTracker that keeps Url (as a field), Error, Progress as channels. I keep such ProgressTracker in a slice and once I submit all tasks I iterate via the slice of ProgressTracker and listen to channels for each tracker in ProgressTracker. Once the number of submitted requests == number of received responses - exit the loop.

Is it Go idiomatic solution? It would be easier to pass ProgressTracker to the function as a channel, but I don't know how to properly send "progress", "error" and "complete" events in such case.

The code is below, the same is available in Go playground: https://go.dev/play/p/f3hXJsZR9WV

package main

import (
    "errors"
    "fmt"
    "strings"
    "sync"
    "time"
)

type ProgressTracker struct {
    Progress chan int
    Error    chan error
    Complete chan bool
    Url      string
}

/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker *ProgressTracker) {
    tracker.Url = url
    fmt.Printf("processing url %s\n", url)
    for i := 1; i <= 5; i   {
        time.Sleep(time.Second)
        if i == 3 && strings.HasSuffix(url, ".net") {
            tracker.Error <- errors.New("emulating error for .net sites")
            tracker.Complete <- true
        }
        progress := 20 * i
        tracker.Progress <- progress
    }
    tracker.Complete <- true
}

func main() {
    var trackers []*ProgressTracker
    var urls = []string{"google.com", "youtube.com", "someurl.net"}
    var wg sync.WaitGroup
    
    wg.Add(len(urls))
    for _, url := range urls {
        tracker := &ProgressTracker{
            Progress: make(chan int),
            Error:    make(chan error),
            Complete: make(chan bool),
        }
        trackers = append(trackers, tracker)
        go func(workUrl string, progressTracker *ProgressTracker) {
            work(workUrl, progressTracker)
        }(url, tracker)
    }

    go func() {
        wg.Wait()
    }()

    var processed = 0

    //iterate through all trackers and select each channel.
    //Exit from this loop when number of processed requests equals the number of trackers
    for {
        for _, t := range trackers {
            select {
            case pr := <-t.Progress:
                fmt.Printf("Url = %s, progress = %d\n", t.Url, pr)
            case err := <-t.Error:
                fmt.Printf("Url = %s, error = %s\n", t.Url, err.Error())
            case <-t.Complete:
                fmt.Printf("Url = %s is completed\n", t.Url)
                processed = processed   1
                if processed == len(trackers) {
                    fmt.Printf("Everything is completed, exit")
                    return
                }
            }
        }
    }
}

UPD: If I add a delay to one of the tasks, then the for loop where I select all the channels will also wait for the slowest worker on each iteration. Go playground: https://go.dev/play/p/9FvDE7ZGIrP Updated work function:

func work(url string, tracker *ProgressTracker) {
    tracker.Url = url
    fmt.Printf("processing url %s\n", url)
    for i := 1; i <= 5; i   {
        if url == "google.com" {
            time.Sleep(time.Second * 3)
        }
        time.Sleep(time.Second)
        if i == 3 && strings.HasSuffix(url, ".net") {
            tracker.Error <- errors.New("emulating error for .net sites")
            tracker.Complete <- true
            return
        }
        progress := 20 * i
        tracker.Progress <- progress
    }
    tracker.Complete <- true
}

CodePudding user response:

You're deadlocking because you're continuing to select against trackers that have finished with no default. Your inner for loop iterates all trackers every time, which includes trackers that are done and are never going to send another message. The easiest way out of this is an empty default, which would also make these behave better in real life where they don't all go at the same pace, but it does turn this into a tight loop which will consume more CPU.

Your WaitGroup doesn't do anything at all; you're calling Wait in a goroutine but doing nothing when it returns, and you never call Done in the goroutines it's tracking, so it will never return. Instead, you're separately tracking the number of Complete messages you get and using that instead of the WaitGroup; it's unclear why this is implemented this way.

Fixing both resolves the stated issue: https://go.dev/play/p/do0g9jrX0mY

However, that's probably not the right approach. It's impossible to say with a contrived example what the right approach would be; if the example is all it needs to do, you don't need any of the logic, you could put your print statements in the workers and just use a waitgroup and no channels and be done with it. Assuming you're actually doing something with the results, you probably want a single Completed channel and a single Error channel shared by all the workers, and possibly a different mechanism altogether for tracking progress, like an atomic int/float you can just read from when you want to know the current progress. Then you don't need the nested looping stuff, you just have one loop with one select to read messages from the shared channels. It all depends on the context in which this code is intended to be used.

CodePudding user response:

Thank you for your answers! I came up with this approach and it works for my needs:

package main

import (
    "errors"
    "fmt"
    "strings"
    "sync"
    "time"
)

type ProgressTracker struct {
    Progress  int
    Error     error
    Completed bool
    Url       string
}

/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker chan ProgressTracker) {
    var internalTracker = ProgressTracker{
        Url: url,
    }
    tracker <- internalTracker
    fmt.Printf("processing url %s\n", url)
    for i := 1; i <= 5; i   {
        if url == "google.com" {
            time.Sleep(time.Second * 3)
        }
        time.Sleep(time.Second)
        if i == 3 && strings.HasSuffix(url, ".net") {
            internalTracker.Error = errors.New("error for .net sites")
            internalTracker.Completed = true
            tracker <- internalTracker
            return
        }
        progress := 20 * i
        internalTracker.Progress = progress
        internalTracker.Completed = false
        tracker <- internalTracker
    }
    internalTracker.Completed = true
    tracker <- internalTracker
}

func main() {
    var urls = []string{"google.com", "youtube.com", "someurl.net"}
    var tracker = make(chan ProgressTracker, len(urls))
    var wg sync.WaitGroup
    wg.Add(len(urls))

    for _, url := range urls {
        go func(workUrl string) {
            defer wg.Done()
            work(workUrl, tracker)
        }(url)
    }

    go func() {
        wg.Wait()
        close(tracker)
        fmt.Printf("After wg wait")
    }()

    var completed = 0

    for completed < len(urls) {
        select {
        case t := <-tracker:
            if t.Completed {
                fmt.Printf("Processing for %s is completed!\n", t.Url)
                completed = completed   1
            } else {
                fmt.Printf("Processing for %s is in progress: %d\n", t.Url, t.Progress)
            }
            if t.Error != nil {
                fmt.Printf("Url %s has errors %s\n", t.Url, t.Error)
            }
        }

    }
}

Here I pass ProgressTracker as a channel (fields in ProgressTracker are declared as simple fields, not channels) and on each event from work function return a complete state of what's is going on (if progress increased - set new value and return the a structure to channel, if error happened - set the error and return the structure, etc).

  • Related