I am trying to simulate a program it need to perform X task in tasks in N seconds and discard other requests of work
I am trying to use timer
with select in an infinite loop when the value on <-timer.C
is received, I am continuing the simulated task with all data in the slice which is guarded against a maxLimit
and reseting the timer
again to initial duration
Here is the code
type Pool struct {
// The maximum number of items allowed in pool
maxSize int
//the queue to hold the data
queue []interface{}
// time window for this pool
time time.Duration
//timer for the batch
timer *time.Timer
//channel
ch chan interface{}
}
func NewPool(maxSize int, t int32) *Pool {
p := &Pool{
maxSize: maxSize,
size: 0,
queue: make([]interface{}, maxSize),
time: time.Duration(t * int32(time.Second)),
timer: time.NewTimer(time.Duration(t) * time.Second),
ch: make(chan interface{}),
}
go p.Schedule()
return p
}
func (p *Pool) Add(ele interface{}) {
p.ch <- ele
}
func (p *Pool) Schedule() {
for {
select {
case <-p.timer.C:
fmt.Println("Time is over")
p.queue = make([]interface{}, 0)
p.timer.Reset(p.time)
p.flush()
case data := <-p.ch:
if len(p.queue) < p.maxSize {
fmt.Println("Addding")
p.queue = append(p.queue, data)
}
//p.flush()
if !p.timer.Stop() {
<-p.timer.C
}
p.queue = make([]interface{}, 0)
}
}
}
func (p *Pool) flush() {
for _, t := range p.queue {
fmt.Println("simulate some work here", t)
time.Sleep(500 * time.Millisecond)
}
}
func main() {
p := NewPool(5, 20)
for i := 0; i < 10000; i {
p.Add("xyz " fmt.Sprint(i))
}
}
But this is not working as expected I am missing few things here, can you guide me to the concurrency pattern which I can use for this requirement, thanks
CodePudding user response:
Adding some additional output can help you locate the issue:
case data := <-p.ch:
fmt.Println("Got Data", len(p.queue), p.maxSize)
if len(p.queue) < p.maxSize {
p.queue = append(p.queue, data)
}
if !p.timer.Stop() {
fmt.Println("Draining")
<-p.timer.C
}
p.queue = make([]interface{}, 0)
}
Running with that change the output is:
Got Data 5 5
Got Data 0 5
Addding
Draining
So two messages are being processed (the first will not output Adding
because len(p.queue)
is 5; this is because you initialise it with a size of five - make([]interface{}, maxSize)
). Consider what the code is doing when a message is received:
- Deal with the
queue
- Stop the timer
- Remake the
queue
Now from the docs for timer.Stop()
:
Stop prevents the Timer from firing. It returns true if the call stops the timer, false if the timer has already expired or been stopped.
On the first iteration this works (the timer is stopped and, if necessary, the channel drained). However you do not reset the timer after stopping it so on the second iteration the timer is already stopped when you call p.timer.Stop()
. This means that Stop()
will return false
(the timer is already stopped!) and you try to drain the channel (but as the timer was already stopped this will block forever).
How you fix this depends upon your aim; I suspect that you meant to reset the timer? If not you could do something like this:
if p.timer.C != nil && !p.timer.Stop() {
fmt.Println("Draining")
<-p.timer.C
}
p.timer.C = nil // Timer has been stopped (nil channel will block forever)