Home > Enterprise >  sync.Waitgroup not being respected
sync.Waitgroup not being respected

Time:11-11

I'm noticing many goroutines are still running even though the program should have waited for them all to complete. My understanding is that adding a waitgroup would fix this issue, but it did not.

func RunIntradayScanner() {
    // waitgroup for channels
    var wg sync.WaitGroup

    logrus.Info("Clearing out pattern slices...")
    var tf5 []request.StratNotification
    var tf15 []request.StratNotification
    var tf30 []request.StratNotification
    var tf60 []request.StratNotification

    // make the channel for comms to functions
    var intradayChannel = make(chan request.StratNotification)

    // range through DB table
    symbols := sources.GetSymbols()
    wg.Add(len(symbols))

    go func() {
        logrus.Info("------Waiting for workers to finish")
        wg.Wait()
        logrus.Info("------Closing intraday channel")
        close(intradayChannel)
    }()

    for _, s := range symbols {
        // wg.Add(1)
        go IntradayStratify(strings.TrimSpace(s.Symbol), intradayChannel, &wg)
        match := <-intradayChannel

        switch match.TimeFrame {
        case 5:
            tf5 = append(tf5, match)
        case 15:
            tf15 = append(tf15, match)
        case 30:
            tf30 = append(tf30, match)
        case 60:
            tf60 = append(tf60, match)
        default:
        }
    }

if len(tf5) > 0 {
        SplitUpAndSendEmbedToDiscord(5, tf5)
    }

    if len(tf15) > 0 {
        SplitUpAndSendEmbedToDiscord(15, tf15)
    }

    if len(tf30) > 0 {
        SplitUpAndSendEmbedToDiscord(30, tf30)
    }

    if len(tf60) > 0 {
        SplitUpAndSendEmbedToDiscord(60, tf60)
    }
}

// IntradayStratify - go routine to run during market hours
func IntradayStratify(ticker string, c chan request.StratNotification, wg *sync.WaitGroup) {
    defer wg.Done()

    candles := request.GetIntraday(ticker)
    for _, tf := range timeframes {
        chunkedCandles := request.DetermineTimeframes(tf, ticker, candles)
        if len(chunkedCandles) > 1 {
            highLows := request.CalculateIntraDayHighLow(chunkedCandles)
            // logrus.Infof("%s Highlows calculated: %d", ticker, len(highLows))
            // Should have more than 2 candles to start detecting patterns now
            if len(highLows) > 2 {
                bl, stratPattern := request.DetermineStratPattern(ticker, tf, highLows)
                if bl {
                    c <- stratPattern
                }
            }
        }

        // otherwise return an empty channel
        c <- request.StratNotification{}
    }

}

func main() {
  RunIntradayScanner()
}

I'm expecting the program to sort of become single-threaded again after for loop ranging through symbols. Instead, stdout looks like below, which looks like goroutines are still returning. The outcome should be that every line that says "Pattern X-X found for timeframe" would also have a corresponding "Sending to discord" output line.

...
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for CRM"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="SNAP Pattern 3-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for EBAY"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for MRVL"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Sending to discord: \n**SPY** :green_circle: $467.16  :red_circle: $466.92"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for CVS"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for QCOM"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Getting intraday data for ZM"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="X Pattern 3-2D found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="SQ Pattern 2D-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="MSFT Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="NVDA Pattern 2D-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="PTON Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="MARA Pattern 2U-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="COIN Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="ROKU Pattern 1-2D found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="SHOP Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="PFE Pattern 3-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="RBLX Pattern 1-2U found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="AFRM Pattern 2D-1 found for timeframe: 5!"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Sending to discord: \n**SNAP** :green_circle: $54.71  :red_circle: $54.59"
strat_1  | time="2021-11-09T20:54:57Z" level=info msg="Done with Intraday scanner"

CodePudding user response:

The original code blocks after every start of a go routine by waiting for a value to be sent over the non buffered channel, in addition the channel is closed when the WaitGroup is count down, this also closed the channel for the receiving side.

Imho a general rule is:

Do not close a channel from the receiver side and do not close a channel if the channel has multiple concurrent senders.

package main

import (
    "fmt"
    "strings"
)

type StratNotification struct {
    Symbol string
}

func GetSymbols() []StratNotification {
    return []StratNotification{
        {Symbol: "a"},
        {Symbol: "b"},
        {Symbol: "c"},
        {Symbol: "d"},
    }
}

func RunIntradayScanner() {
    symbols := GetSymbols()
    var intradayChannel = make(chan StratNotification)
    for _, s := range symbols {
        go IntradayStratify(strings.TrimSpace(s.Symbol), intradayChannel)
    }

    for _ = range symbols {
        s := <-intradayChannel
        fmt.Println(s)
    }
}

func IntradayStratify(ticker string, c chan StratNotification) {
    // do some heavy lifting
    fmt.Println(ticker)
    c <- StratNotification{}
}

func main() {
    RunIntradayScanner()
}
  •  Tags:  
  • go
  • Related