Home > Net >  Why do I need a wg.Wait() and close() in a separate go routine?
Why do I need a wg.Wait() and close() in a separate go routine?

Time:10-08

I have a large number of directories all containing hundreds to thousands of files. I want to loop through the list of directories. Then call a go routine for each directory that will scan the directories for files and add the path of the each file to a job queue for a set of works to process.

This is what I have so far:

type AppConfig struct {
    UploadPath string `mapstructure:"upload_path"`
    LocalPath  string `mapstructure:"local_path"`
    Bucket     string `mapstructure:"bucket"`
}

func consumer(i int, jobs <-chan *ops.Job) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker: %v is processing file: %v\n", i, job.Work)
    }
}

func producer(jobs chan<- *ops.Job, filesToTransfer []string) {
    for i, file := range filesToTransfer {
        jobs <- &ops.Job{Id: i, Work: file}
    }
}

func main() {
    var (
        appconfigs map[string]*ops.AppConfig
        wg *sync.WaitGroup
    )

    jobs := make(chan *ops.Job)

    // setting up workers
    for i := 0; i < 10; i   {
        wg.Add(1)
        go consumer(i, jobs)
    }

    // adding jobs
    for _, values := range appconfigs {
        filesToTransfer := ops.ScanUploadPath(values.LocalPath)
        go producer(jobs, filesToTransfer)

    }

    go func() {
        wg.Wait()
        close(jobs)
    }()
}

I was running into a deadlock and panic on closed channel issue before when I had my close(jobs) call in my producer function. I read I should have this in my main() instead:

go func() {
        wg.Wait()
        close(jobs)
}()

I don't really understand why I need a separate go routine outside of my producer. I was hoping someone could explain why.

CodePudding user response:

Because the program exits when the main function returns, moving the calls to a goroutine masks the problem.

Use this code:

// Start workers.
var wg sync.WaitGroup
jobs := make(chan *ops.Job)
for i := 0; i < 10; i   {
    wg.Add(1)
    go consumer(&wg, i, jobs)
}

// Send jobs to the workers.
for _, values := range appconfigs {
    filesToTransfer := ops.ScanUploadPath(values.LocalPath)

    // Send the jobs from the main goroutine.
    // Nothing is gained by using a goroutine
    // as in the question.
    for i, file := range filesToTransfer {
        jobs <- &ops.Job{Id: i, Work: file}
     }
}

// Close the channel to signal that all jobs are
// sent.
close(jobs)

// Wait for the workers to complete.
wg.Wait()
  • Related