My consumer (run from main
) supports context cancellation and reading from a channel via case
statement. I can shutdown the consumer with the context, that works fine. However, when I spawn several workers in one case statement, every worker is given the same job (message) from jobsChan
, which is not what I want:
func (app *App) consumer() {
for {
select {
case <-app.ctx.Done():
app.infoLog.Print("Caught SIGINT, stopping.")
app.wg.Wait()
app.doneChan <- struct{}{} # main uses this channel to block itself until all goroutines are stopped
app.infoLog.Print("Shutting down the consumer...")
return
case job := <-app.jobsChan:
// PROBLEM here: wrong, each worker is given the same job
for workerNumber := 0; workerNumber < app.config.workers; workerNumber {
app.wg.Add(1)
go app.workerFunc(workerNumber, job)
}
}
}
}
func (app *App) workerFunc(id int, job Job) {
defer app.wg.Done()
... actual worker code here ...
}
How can I rewrite this code so that I can keep select
for app.ctx.Done
channel and at the same time can spawn workers so that each worker picks next message from the channel as a Job? I need to keep for/select
to listen for ctx
cancellation but at the same time I need to spawn X workers reading messages from jobsChan
in the consumer. Is this possible?
The only alternative that comes to mind is passing channel directly into spawned workerFunc
and have another for job := range app.jobsChan
in the workerFunc
. But then the whole case job := <-app.jobsChan:
in the consumer becomes pointless and I am not sure how to rewrite it.
To clarify: When I run the app, I expect every worker to have a new job id pulled from the jobsChan
- but they all process the same, e.g. 1, then they all process the next one, e.g. 2
#wrong
Worker 0: start processing item 1
Worker 2: start processing item 1
Worker 1: start processing item 1
CodePudding user response:
Your existing code explicitly assigns the same job to all the workers. If you have a fixed number of workers, create goroutines for them (during initialization), and have them listen to a channel:
for workerNumber:0;workerNumber<app.config.workers;workerNumber {
go app.workerFunc(ctx,workerNumber,app.jobsChan)
}
In each worker, simply check the jobQueue and the context cancellation.
In other words, you don't need the consumer
, pass jobs directly to the workers.