Good day,
I'm trying to implement the correct delay between the execution of workers, for example, it is necessary for the workers to complete 30 tasks and go to sleep for 5 seconds, how can I track in the code that exactly 30 tasks have been completed and only after that go to sleep for 5 seconds?
Below is the code that creates a pool of 30 workers, who, in turn, perform tasks of 30 pieces at a time in an unordered manner, here is the code:
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum = digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i {
if i != 0 && i0 == 0 {
fmt.Printf("SLEEPAGE 5 sec...")
time.Sleep(10 * time.Second)
}
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 30
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
play: https://go.dev/play/p/lehl7hoo-kp
It is not clear exactly how to make sure that 30 tasks are completed and where to insert the delay, I will be grateful for any help
CodePudding user response:
Okey, so let's start with this working example:
func Test_t(t *testing.T) {
// just a published, this publishes result on a chan
publish := func(s int, ch chan int, wg *sync.WaitGroup) {
ch <- s // this is blocking!!!
wg.Done()
}
wg := &sync.WaitGroup{}
wg.Add(100)
// we'll use done channel to notify the work is done
res := make(chan int)
done := make(chan struct{})
// create worker that will notify that all results were published
go func() {
wg.Wait()
done <- struct{}{}
}()
// let's create a jobs that publish on our res chan
// please note all goroutines are created immediately
for i := 0; i < 100; i {
go publish(i, res, wg)
}
// lets get 30 args and then wait
var resCounter int
forloop:
for {
select {
case ss := <-res:
println(ss)
resCounter = 1
// break the loop
if resCounter0 == 0 {
// after receiving 30 results we are blocking this thread
// no more results will be taken from the channel for 5 seconds
println("received 30 results, waiting...")
time.Sleep(5 * time.Second)
}
case <-done:
// we are done here, let's break this infinite loop
break forloop
}
}
}
I hope this shows moreover how it can be done.
So, what's the problem with your code? To be honest, it looks fine (I mean 30 results are published, then the code wait, then another 30 results, etc.), but the question is where would you like to wait?
There are a few possibilities I guess:
creating workers (this is how your code works now, as I see, it publishes jobs in 30-packs; please notice that the 2-second delay you have in the
digit
function is applicable only to the goroutine the code is executed)triggering workers (so the "wait" code should be in worker function, not allowing to run more workers - so it must watch how many results were published)
handling results (this is how my code works and proper synchronization is in the
forloop
)