I have a large number of directories all containing hundreds to thousands of files. I want to loop through the list of directories. Then call a go routine for each directory that will scan the directories for files and add the path of the each file to a job queue for a set of works to process.
This is what I have so far:
type AppConfig struct {
UploadPath string `mapstructure:"upload_path"`
LocalPath string `mapstructure:"local_path"`
Bucket string `mapstructure:"bucket"`
}
func consumer(i int, jobs <-chan *ops.Job) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker: %v is processing file: %v\n", i, job.Work)
}
}
func producer(jobs chan<- *ops.Job, filesToTransfer []string) {
for i, file := range filesToTransfer {
jobs <- &ops.Job{Id: i, Work: file}
}
}
func main() {
var (
appconfigs map[string]*ops.AppConfig
wg *sync.WaitGroup
)
jobs := make(chan *ops.Job)
// setting up workers
for i := 0; i < 10; i {
wg.Add(1)
go consumer(i, jobs)
}
// adding jobs
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)
go producer(jobs, filesToTransfer)
}
go func() {
wg.Wait()
close(jobs)
}()
}
I was running into a deadlock and panic on closed channel issue before when I had my close(jobs) call in my producer function. I read I should have this in my main() instead:
go func() {
wg.Wait()
close(jobs)
}()
I don't really understand why I need a separate go routine outside of my producer. I was hoping someone could explain why.
CodePudding user response:
Because the program exits when the main function returns, moving the calls to a goroutine masks the problem.
Use this code:
// Start workers.
var wg sync.WaitGroup
jobs := make(chan *ops.Job)
for i := 0; i < 10; i {
wg.Add(1)
go consumer(&wg, i, jobs)
}
// Send jobs to the workers.
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)
// Send the jobs from the main goroutine.
// Nothing is gained by using a goroutine
// as in the question.
for i, file := range filesToTransfer {
jobs <- &ops.Job{Id: i, Work: file}
}
}
// Close the channel to signal that all jobs are
// sent.
close(jobs)
// Wait for the workers to complete.
wg.Wait()