I'm just getting into concurrency in Go and trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel. If a message comes into my dispatch function via the dispatchchan channel and my other go routines are busy, the message is appended onto the stack slice in the dispatcher and the dispatcher will try to send again later when a worker becomes available, and/or no more messages are received on the dispatchchan. This is because the dispatchchan and the jobchan are unbuffered, and the go routine the workers are running will append other messages to the dispatcher up to a certain point and I don't want the workers blocked waiting on the dispatcher and creating a deadlock. Here's the dispatcher code I've come up with so far:
func dispatch() {
var stack []string
acount := 0
for {
select {
case d := <-dispatchchan:
stack = append(stack, d)
case c := <-mw:
acount = acount c
case jobchan <-stack[0]:
if len(stack) > 1 {
stack[0] = stack[len(stack)-1]
stack = stack[:len(stack)-1]
} else {
stack = nil
}
default:
if acount == 0 && len(stack) == 0 {
close(jobchan)
close(dispatchchan)
close(mw)
wg.Done()
return
}
}
}
Complete example at https://play.golang.wiki/p/X6kXVNUn5N7
The mw channel is a buffered channel the same length as the number of worker go routines. It acts as a semaphore for the worker pool. If the worker routine is doing [m]eaningful [w]ork it throws int 1 on the mw channel and when it finishes its work and goes back into the for loop listening to the jobchan it throws int -1 on the mw. This way the dispatcher knows if there's any work being done by the worker pool, or if the pool is idle. If the pool is idle and there are no more messages on the stack, then the dispatcher closes the channels and return control to the main func.
This is all good but the issue I have is that the stack itself could be zero length so the case where I attempt to send stack[0] to the jobchan, if the stack is empty, I get an out of bounds error. What I'm trying to figure out is how to ensure that when I hit that case, either stack[0] has a value in it or not. I don't want that case to send an empty string to the jobchan.
Any help is greatly appreciated. If there's a more idomatic concurrency pattern I should consider, I'd love to hear about it. I'm not 100% sold on this solution but this is the farthest I've gotten so far.
CodePudding user response:
This is all good but the issue I have is that the stack itself could be zero length so the case where I attempt to send stack[0] to the jobchan, if the stack is empty, I get an out of bounds error.
I can't reproduce it with your playground link, but it's believable, because at lest one gofunc
worker might have been ready to receive on that channel.
My output has been Msgcnt: 0
, which is also easily explained, because gofunc
might not have been ready to receive on jobschan
when dispatch()
runs its select
. The order of these operations is not defined.
trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel
A channel needs no dispatcher. A channel is the dispatcher.
If a message comes into my dispatch function via the dispatchchan channel and my other go routines are busy, the message is [...] will [...] send again later when a worker becomes available, [...] or no more messages are received on the dispatchchan.
With a few creative edits, it was easy to turn that into something close to the definition of a buffered channel. It can be read from immediately, or it can take up to some "limit
" of messages that can't be immediately dispatched. You do define limit
, though it's not used elsewhere within your code.
In any function, defining a variable you don't read will result in a compile time error like limit declared but not used
. This stricture improves code quality and helps identify typeos. But at package scope, you've gotten away with defining the unused limit
as a "global" and thus avoided a useful error - you haven't limited anything.
Don't use globals. Use passed parameters to define scope, because the definition of scope is tantamount to functional concurrency as expressed with the go
keyword. Pass the relevant channels defined in local scope to functions defined at package scope so that you can easily track their relationships. And use directional channels to enforce the producer/consumer relationship between your functions. More on this later.
Going back to "limit", it makes sense to limit the quantity of jobs you're queueing because all resources are limited, and accepting more messages than you have any expectation of processing requires more durable storage than process memory provides. If you don't feel obligated to fulfill those requests no matter what, don't accept "too many" of them in the first place.
So then, what function has dispatchchan
and dispatch()
? To store a limited number of pending requests, if any, before they can be processed, and then to send them to the next available worker? That's exactly what a buffered channel is for.
Circular Logic
Who "knows" when your program is done? main()
provides the initial input, but you close all 3 channels in `dispatch():
close(jobchan)
close(dispatchchan)
close(mw)
Your workers write to their own job queue so only when the workers are done writing to it can the incoming job queue be closed. However, individual workers also don't know when to close the jobs queue because other workers are writing to it. Nobody knows when your algorithm is done. There's your circular logic.
The mw channel is a buffered channel the same length as the number of worker go routines. It acts as a semaphore for the worker pool.
There's a race condition here. Consider the case where all n
workers have just received the last n
jobs. They've each read from jobschan
and they're checking the value of ok
. disptatcher
proceeds to run its select
. Nobody is writing to dispatchchan
or reading from jobschan
right now so the default
case is immediately matched. len(stack)
is 0
and there's no current job
so dispatcher
closes all channels including mw
. At some point thereafter, a worker tries to write to a closed channel and panics.
So finally I'm ready to provide some code, but I have one more problem: I don't have a clear problem statement to write code around.
I'm just getting into concurrency in Go and trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel.
Channels between goroutines are like the teeth that synchronize gears. But to what end do the gears turn? You're not trying to keep time, nor construct a wind-up toy. Your gears could be made to turn, but what would success look like? Their turning?
Let's try to define a more specific use case for channels: given an arbitrarily long set of durations coming in as strings on standard input*, sleep that many seconds in one of n
workers. So that we actually have a result to return, we'll say each worker will return the start and end time the duration was run for.
- So that it can run in the playground, I'll simulate standard input with a hard-coded byte buffer.
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"strings"
"sync"
"time"
)
type SleepResult struct {
worker_id int
duration time.Duration
start time.Time
end time.Time
}
func main() {
var num_workers = 2
workchan := make(chan time.Duration)
resultschan := make(chan SleepResult)
var wg sync.WaitGroup
var resultswg sync.WaitGroup
resultswg.Add(1)
go results(&resultswg, resultschan)
for i := 0; i < num_workers; i {
wg.Add(1)
go worker(i, &wg, workchan, resultschan)
}
// playground doesn't have stdin
var input = bytes.NewBufferString(
strings.Join([]string{
"3ms",
"1 seconds",
"3600ms",
"300 ms",
"5s",
"0.05min"}, "\n") "\n")
var scanner = bufio.NewScanner(input)
for scanner.Scan() {
text := scanner.Text()
if dur, err := time.ParseDuration(text); err != nil {
fmt.Fprintln(os.Stderr, "Invalid duration", text)
} else {
workchan <- dur
}
}
close(workchan) // we know when our inputs are done
wg.Wait() // and when our jobs are done
close(resultschan)
resultswg.Wait()
}
func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) {
for res := range resultschan {
fmt.Printf("Worker %d: %s : %s => %s\n",
res.worker_id, res.duration,
res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
}
wg.Done()
}
func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) {
var res = SleepResult{worker_id: id}
for dur := range jobchan {
res.duration = dur
res.start = time.Now()
time.Sleep(res.duration)
res.end = time.Now()
resultschan <- res
}
wg.Done()
}
Here I use 2 wait groups, one for the workers, one for the results. This makes sure Im done writing all the results before main()
ends. I keep my functions simple by having each function do exactly one thing at a time: main reads inputs, parses durations from them, and sends them off to the next worker. The results
function collects results and prints them to standard output. The worker does the sleeping, reading from jobchan
and writing to resultschan
.
workchan
can be buffered (or not, as in this case); it doesn't matter because the input will be read at the rate it can be processed. We can buffer as much input as we want, but we can't buffer an infinite amount. I've set channel sizes as big as 1e6
- but a million is a lot less than infinite. For my use case, I don't need to do any buffering at all.
main
knows when the input is done and can close the jobschan
. main
also knows when jobs are done (wg.Wait()
) and can close the results channel. Closing these channels is an important signal to the worker
and results
goroutines - they can distinguish between a channel that is empty and a channel that is guaranteed not to have any new additions.
for job := range jobchan {...}
is shorthand for your more verbose:
for {
job, ok := <- jobchan
if !ok {
wg.Done()
return
}
...
}
Note that this code creates 2 workers, but it could create 20 or 2000, or even 1. The program functions regardless of how many workers are in the pool. It can handle any volume of input (though interminable input of course leads to an interminable program). It does not create a cyclic loop of output to input. If your use case requires jobs to create more jobs, that's a more challenging scenario that can typically be avoided with careful planning.
I hope this gives you some good ideas about how you can better use concurrency in your Go applications.