Home > Software engineering >  How can I ensure that a spawned go routine finishes processing an array on program terminiation
How can I ensure that a spawned go routine finishes processing an array on program terminiation

Time:12-30

I am processing records from a kafka topic. The endpoint I need to send these records to supports sending an array of up to 100 records. the kafka records also contains information for performing the rest call (currently only 1 to 2 variations, but this will increase as the number of different record types are processed). I am currently loading a struct array of the unique configs when they are found, and each of these configs have their own queue array. For each config, I spawn a new go routine that will process any records in its queue on a timer (for example 100ms). This process works just fine currently. The issue I am having is when the program shuts down. I do not want to leave any unsent records in the queue and want to finish processing them before app shuts down. The below current code handles the interrupt and starts checking the queue depths, but once the interrupt happens, the queue count does not ever decrease, so the program will never terminate. Any thoughts would be appreciated.

package main

import (
    "context"
    "encoding/json"
    "os"
    "os/signal"
    "strconv"
    "syscall"
    "time"
    _ "time/tzdata"

    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

type ChannelDetails struct {
    ChannelDetails MsgChannel
    LastUsed       time.Time
    Active         bool
    Queue          []OutputMessage
}

type OutputMessage struct {
    Config  MsgConfig `json:"config"`
    Message string    `json:"message"`
}

type MsgConfig struct {
    Channel MsgChannel `json:"channel"`
}

type MsgChannel struct {
    Id      int    `json:"id"`
    MntDate string `json:"mntDate"`
    Otype   string `json:"oType"`
}

var channels []ChannelDetails

func checkQueueDepths() int {
    var depth int = 0
    for _, c := range channels {
        depth  = len(c.Queue)
    }
    return depth
}

func TimeIn(t time.Time, name string) (time.Time, error) {
    loc, err := time.LoadLocation(name)
    if err == nil {
        t = t.In(loc)
    }
    return t, err
}

func find(channel *MsgChannel) int {
    for i, c := range channels {
        if c.ChannelDetails.Id == channel.Id &&
            c.ChannelDetails.MntDate == channel.MntDate {
            return i
        }
    }
    return len(channels)
}

func splice(queue []OutputMessage, count int) (ret []OutputMessage, deleted []OutputMessage) {
    ret = make([]OutputMessage, len(queue)-count)
    deleted = make([]OutputMessage, count)
    copy(deleted, queue[0:count])
    copy(ret, queue[:0])
    copy(ret[0:], queue[0 count:])
    return
}

func load(msg OutputMessage, logger *zap.Logger) {

    i := find(&msg.Config.Channel)

    if i == len(channels) {
        channels = append(channels, ChannelDetails{
            ChannelDetails: msg.Config.Channel,
            LastUsed:       time.Now(),
            Active:         false,
            Queue:          make([]OutputMessage, 0, 200),
        })
    }
    channels[i].LastUsed = time.Now()
    channels[i].Queue = append(channels[i].Queue, msg)
    if !channels[i].Active {
        channels[i].Active = true
        go process(&channels[i], logger)
    }
}

func process(data *ChannelDetails, logger *zap.Logger) {
    for {
        // if Queue is empty and not used for 5 minutes, flag as inActive and shut down go routine
        if len(data.Queue) == 0 &&
            time.Now().After(data.LastUsed.Add(time.Second*10)) { //reduced for example
            data.Active = false
            logger.Info("deactivating routine as queue is empty")
            break
        }

        // if Queue has records, process
        if len(data.Queue) != 0 {
            drainStart, _ := TimeIn(time.Now(), "America/New_York")
            spliceCnt := len(data.Queue)
            if spliceCnt > 100 {
                spliceCnt = 100 // rest api endpoint can only accept array up to 100 items
            }
            items := []OutputMessage{}
            data.Queue, items = splice(data.Queue, spliceCnt)
            //process items ... will send array of items to a rest endpoint in another go routine
            drainEnd, _ := TimeIn(time.Now(), "America/New_York")
            logger.Info("processing records",
                zap.Int("numitems", len(items)),
                zap.String("start", drainStart.Format("2006-01-02T15:04:05.000-07:00")),
                zap.String("end", drainEnd.Format("2006-01-02T15:04:05.000-07:00")),
            )

        }

        time.Sleep(time.Millisecond * time.Duration(500))
    }
}

func initZapLog() *zap.Logger {
    config := zap.NewProductionConfig()
    config.EncoderConfig.TimeKey = "timestamp"
    config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
    logger, _ := config.Build()
    zap.ReplaceGlobals(logger)
    return logger
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    logger := initZapLog()
    defer logger.Sync()

    test1 := `{
        "config": {
            "channel": {
                "id": 1,
                "mntDate": "2021-12-01",
                "oType": "test1"
            }
        },
        "message": "test message1"
    }`
    test2 := `{
        "config": {
            "channel": {
                "id": 2,
                "mntDate": "2021-12-01",
                "oType": "test2"
            }
        },
        "message": "test message2"
    }`
    var testMsg1 OutputMessage
    err := json.Unmarshal([]byte(test1), &testMsg1)
    if err != nil {
        logger.Panic("unable to unmarshall test1 data "   err.Error())
    }
    var testMsg2 OutputMessage
    err = json.Unmarshal([]byte(test2), &testMsg2)
    if err != nil {
        logger.Panic("unable to unmarshall test2 data "   err.Error())
    }

    exitCh := make(chan struct{})
    go func(ctx context.Context) {
        for {
            //original data is streamed from kafka
            load(testMsg1, logger)
            load(testMsg2, logger)

            time.Sleep(time.Millisecond * time.Duration(5))
            select {
            case <-ctx.Done():
                logger.Info("received done")
                var depthChk int
                for {
                    depthChk = checkQueueDepths()
                    if depthChk == 0 {
                        break
                    } else {
                        logger.Info("Still processing queues.  Msgs left: "   strconv.Itoa(depthChk))
                    }
                    time.Sleep(100 * time.Millisecond)
                }
                exitCh <- struct{}{}
                return
            default:
            }
        }
    }(ctx)

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigs
        depths := checkQueueDepths()
        logger.Info("You pressed ctrl   C. Queue depth is: "   strconv.Itoa(depths))
        cancel()
    }()
    <-exitCh
}

example logs:

{"level":"info","timestamp":"2021-12-28T15:26:06.136-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":91,"start":"2021-12-28T15:26:06.136-05:00","end":"2021-12-28T15:26:06.136-05:00"}
{"level":"info","timestamp":"2021-12-28T15:26:06.636-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":92,"start":"2021-12-28T15:26:06.636-05:00","end":"2021-12-28T15:26:06.636-05:00"}
^C{"level":"info","timestamp":"2021-12-28T15:26:06.780-0500","caller":"testgo/main.go:205","msg":"You pressed ctrl   C. Queue depth is: 2442"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:182","msg":"received done"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:189","msg":"Still processing queues.  Msgs left: 2442"} --line repeats forever

CodePudding user response:

The sync golang package https://pkg.go.dev/sync has the Wait group type that allows you to wait for a group of go routines to complete before the main routine returns.

The best usage example is in this blog post: https://go.dev/blog/pipelines

CodePudding user response:

To 'wait' for all spawned goroutines from inside the main goroutine to finish, there's 2 ways to do this. The most simple would be to add a

runtime.Goexit() 

to the end of your main goroutine, after <-exitCh

Simply, it does this:

"Calling Goexit from the main goroutine terminates that goroutine without func main returning. Since func main has not returned, the program continues execution of other goroutines. If all other goroutines exit, the program crashes."

The other way would be to use a waitgroup, think of a waitgroup as a counter, with a method where the program will 'wait' on the line where the method is called till the counter hits zero:

var wg sync.WaitGroup // declare the waitgroup

Then inside each goroutine that you are to wait on, you add/increment the waitgroup:

wg.Add() // you typically call this for each spawned goroutine

Then when you want to state that the goroutine has finished work, you call

wg.Done() // when you consider the spawned routine to be done call this

Which decrements the counter

Then where you want the code to 'wait' till the counter is zero, you add line:

wg.Wait() // wait here till counter hits zero

And the code will block till the number goroutines that are counted with Add() and decremented with Done() hits zero

  •  Tags:  
  • go
  • Related