Home > Enterprise >  Streaming values from websocket, determining if I am lagging behind processing the data
Streaming values from websocket, determining if I am lagging behind processing the data

Time:09-29

I am connecting to a websocket that is stream live stock trades.
I have to read the prices, perform calculations on the fly and based on these calculations make another API call e.g. buy or sell.

I want to ensure my calculations/processing doesn't slow down my ability to stream in all the live data.

What is a good design pattern to follow for this type of problem? Is there a way to log/warn in my system to know if I am falling behind?

Falling behind means: the websocket is sending price data, and I am not able to process that data as it comes in and it is lagging behind.

  1. While doing the c.ReadJSON and then passing the message to my channel, there might be a delay in deserializing into JSON

  2. When inside my channel and processing, calculating formulas and sending another API request to buy/sell, this will add delays

How can I prevent lags/delays and also monitor if indeed there is a delay?

func main() {
    c, _, err := websocket.DefaultDialer.Dial("wss://socket.example.com/stocks", nil)
    if err != nil {
        panic(err)
    }
    defer c.Close()


    // Buffered channel to account for bursts or spikes in data:
    chanMessages := make(chan interface{}, 10000)

    // Read messages off the buffered queue:
    go func() {
        for msgBytes := range chanMessages {
            logrus.Info("Message Bytes: ", msgBytes)
        }
    }()

    // As little logic as possible in the reader loop:
    for {
        var msg interface{}
        err := c.ReadJSON(&msg)

        if err != nil {
            panic(err)
        }
        chanMessages <- msg
    }

}

CodePudding user response:

You can read bytes, pass them to the channel, and use other goroutines to do conversion.

CodePudding user response:

I worked on a similar crypto market bot. Instead of creating large buffured channel i created buffered channel with cap of 1 and used select statement for sending socket data to channel.

Here is the example

var wg sync.WaitGroup
msg := make(chan []byte, 1)

wg.Add(1)
go func() {
    defer wg.Done()
    for data := range msg {
        // decode and process data
    }
}()

for {
    _, data, err := c.ReadMessage()
    if err != nil {
        log.Println("read error: ", err)
        return
    }
    select {
    case msg <- data: // in case channel is free
    default: // if not, next time will try again with latest data
    }
}

This will insure that you'll get the latest data when you are ready to process.

  •  Tags:  
  • go
  • Related