I'd like to track execution of some long running process and show the user completion percentage and errors (if any). If it's one long running process, then it's easy - you can create channels for progress (percentage) and error. What would the correct way to implement such logic when we have X long running processes?
Below is a snippet of code that works, but I don't really like how it's implemented.
I created a struct ProgressTracker
that keeps Url
(as a field), Error
, Progress
as channels. I keep such ProgressTracker
in a slice and once I submit all tasks I iterate via the slice of ProgressTracker
and listen to channels for each tracker
in ProgressTracker
. Once the number of submitted requests == number of received responses - exit the loop.
Is it Go idiomatic solution? It would be easier to pass ProgressTracker
to the function as a channel, but I don't know how to properly send "progress", "error" and "complete" events in such case.
The code is below, the same is available in Go playground: https://go.dev/play/p/f3hXJsZR9WV
package main
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
type ProgressTracker struct {
Progress chan int
Error chan error
Complete chan bool
Url string
}
/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker *ProgressTracker) {
tracker.Url = url
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i {
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
tracker.Error <- errors.New("emulating error for .net sites")
tracker.Complete <- true
}
progress := 20 * i
tracker.Progress <- progress
}
tracker.Complete <- true
}
func main() {
var trackers []*ProgressTracker
var urls = []string{"google.com", "youtube.com", "someurl.net"}
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
tracker := &ProgressTracker{
Progress: make(chan int),
Error: make(chan error),
Complete: make(chan bool),
}
trackers = append(trackers, tracker)
go func(workUrl string, progressTracker *ProgressTracker) {
work(workUrl, progressTracker)
}(url, tracker)
}
go func() {
wg.Wait()
}()
var processed = 0
//iterate through all trackers and select each channel.
//Exit from this loop when number of processed requests equals the number of trackers
for {
for _, t := range trackers {
select {
case pr := <-t.Progress:
fmt.Printf("Url = %s, progress = %d\n", t.Url, pr)
case err := <-t.Error:
fmt.Printf("Url = %s, error = %s\n", t.Url, err.Error())
case <-t.Complete:
fmt.Printf("Url = %s is completed\n", t.Url)
processed = processed 1
if processed == len(trackers) {
fmt.Printf("Everything is completed, exit")
return
}
}
}
}
}
UPD: If I add a delay to one of the tasks, then the for loop where I select all the channels will also wait for the slowest worker on each iteration. Go playground: https://go.dev/play/p/9FvDE7ZGIrP Updated work function:
func work(url string, tracker *ProgressTracker) {
tracker.Url = url
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i {
if url == "google.com" {
time.Sleep(time.Second * 3)
}
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
tracker.Error <- errors.New("emulating error for .net sites")
tracker.Complete <- true
return
}
progress := 20 * i
tracker.Progress <- progress
}
tracker.Complete <- true
}
CodePudding user response:
You're deadlocking because you're continuing to select
against trackers that have finished with no default. Your inner for
loop iterates all trackers every time, which includes trackers that are done and are never going to send another message. The easiest way out of this is an empty default
, which would also make these behave better in real life where they don't all go at the same pace, but it does turn this into a tight loop which will consume more CPU.
Your WaitGroup
doesn't do anything at all; you're calling Wait
in a goroutine but doing nothing when it returns, and you never call Done
in the goroutines it's tracking, so it will never return. Instead, you're separately tracking the number of Complete
messages you get and using that instead of the WaitGroup; it's unclear why this is implemented this way.
Fixing both resolves the stated issue: https://go.dev/play/p/do0g9jrX0mY
However, that's probably not the right approach. It's impossible to say with a contrived example what the right approach would be; if the example is all it needs to do, you don't need any of the logic, you could put your print statements in the workers and just use a waitgroup and no channels and be done with it. Assuming you're actually doing something with the results, you probably want a single Completed
channel and a single Error
channel shared by all the workers, and possibly a different mechanism altogether for tracking progress, like an atomic int/float you can just read from when you want to know the current progress. Then you don't need the nested looping stuff, you just have one loop with one select
to read messages from the shared channels. It all depends on the context in which this code is intended to be used.
CodePudding user response:
Thank you for your answers! I came up with this approach and it works for my needs:
package main
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
type ProgressTracker struct {
Progress int
Error error
Completed bool
Url string
}
/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker chan ProgressTracker) {
var internalTracker = ProgressTracker{
Url: url,
}
tracker <- internalTracker
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i {
if url == "google.com" {
time.Sleep(time.Second * 3)
}
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
internalTracker.Error = errors.New("error for .net sites")
internalTracker.Completed = true
tracker <- internalTracker
return
}
progress := 20 * i
internalTracker.Progress = progress
internalTracker.Completed = false
tracker <- internalTracker
}
internalTracker.Completed = true
tracker <- internalTracker
}
func main() {
var urls = []string{"google.com", "youtube.com", "someurl.net"}
var tracker = make(chan ProgressTracker, len(urls))
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
go func(workUrl string) {
defer wg.Done()
work(workUrl, tracker)
}(url)
}
go func() {
wg.Wait()
close(tracker)
fmt.Printf("After wg wait")
}()
var completed = 0
for completed < len(urls) {
select {
case t := <-tracker:
if t.Completed {
fmt.Printf("Processing for %s is completed!\n", t.Url)
completed = completed 1
} else {
fmt.Printf("Processing for %s is in progress: %d\n", t.Url, t.Progress)
}
if t.Error != nil {
fmt.Printf("Url %s has errors %s\n", t.Url, t.Error)
}
}
}
}
Here I pass ProgressTracker
as a channel (fields in ProgressTracker
are declared as simple fields, not channels) and on each event from work function return a complete state of what's is going on (if progress increased - set new value and return the a structure
to channel, if error happened - set the error and return the structure, etc).