Home > Mobile >  Reconnect to a websocket server if a condition is met
Reconnect to a websocket server if a condition is met

Time:09-23

I want to stream real-time data of the order book of BTC/USD using a WebSocket connection to the FTX Exchange. After the first snapshot of the order book, the WebSocket returns updates that I apply to my local reconstructed order book. To ensure that my order book is synchronized, I have to check a crc32 integer after every update. If this number matches the checksum in the message, I can be confident that my order book is well synchronized. However, sometimes the checksum is not right, and I need to reset the connection, i.e. unsubscribe to the channel and subscribe right after. At the end of the on_messagefunction, I check if the checksum is successful. If it is not, I close the connection but I would like to clear the memory (i.e. the global objects asks, bids, checksum) and reconnect right after. How can I do it?

Should I just add a while True loop at the end? Like this one?

while True:
    ws.run_forever()

I don't like this while Truesolution because it's impossible for me to stop it, I have to quit the terminal.

My code is the following

import websocket,json
import zlib
from decimal import Decimal
import binascii
from itertools import chain, zip_longest
from typing import Iterable, Sequence

asks = {}
bids = {}
checksum = {'checksum':0}
def format_e(dec):
    return ('{:.'   str(len(dec.as_tuple().digits) - 1)   'e}').format(dec)

def check_sum(
    asks: Iterable[Sequence[float]], bids: Iterable[Sequence[float]]
) -> int:
    asks=[[level[0],level[1]]for level in asks.items()]
    bids=[[level[0],level[1]]for level in bids.items()]

    order_book_hash_iterator = zip_longest(bids, asks, fillvalue=tuple())
    check_string = ":".join(
        (
            str(token)
            for ask_level, bid_level in order_book_hash_iterator
            for token in chain(ask_level, bid_level)
        )
    )

    return binascii.crc32(check_string.encode("ascii"))
    
    
def on_open(ws):
    print('Opened connection')
    asks.clear()
    bids.clear()
    subscribe_message = {'op': 'subscribe', 'channel': 'orderbook','market':'BTC/USD'}
    ws.send(json.dumps(subscribe_message))


def on_message(ws,message):
    js=json.loads(message)
    if js['type'] == 'partial':
        print('Get Snapshot')
        for level in js['data']['asks']:
            asks[level[0]]=level[1]
        for level in js['data']['bids']:
            bids[level[0]]=level[1]
        checksum['checksum']=js['data']['checksum']
    if js['type'] == 'update':
        for level in js['data']['asks']:
            if level[1]==0:
                asks.pop(level[0])
            else:
                asks[level[0]]=level[1]
        for level in js['data']['bids']:
            if level[1]==0:
                bids.pop(level[0])
            else:
                bids[level[0]]=level[1]
    if check_sum(asks,bids) != js['data']['checksum']:
        print('Error')
        ws.close()


socket = "wss://ftx.com/ws/"
ws = websocket.WebSocketApp(socket,on_open=on_open)
ws.on_message = lambda ws,msg: on_message(ws,msg)
ws.run_forever()

CodePudding user response:

How about this:

# everything up to and including `on_open` the same


class ChecksumException(Exception):
    pass


def on_message(ws, message):
    print(message)
    js = json.loads(message)
    if js['type'] == 'partial':
        print('Get Snapshot')
        for level in js['data']['asks']:
            asks[level[0]] = level[1]
        for level in js['data']['bids']:
            bids[level[0]] = level[1]
        checksum['checksum'] = js['data']['checksum']
    if js['type'] == 'update':
        for level in js['data']['asks']:
            if level[1] == 0:
                asks.pop(level[0])
            else:
                asks[level[0]] = level[1]
        for level in js['data']['bids']:
            if level[1] == 0:
                bids.pop(level[0])
            else:
                bids[level[0]] = level[1]
    if js['type'] == 'subscribed':
        return
    # so, checking this for *any* type of message, except 'subscribed'
    if check_sum(asks, bids) != js['data']['checksum']:
        raise ChecksumException


def main():
    socket = "wss://ftx.com/ws/"
    while True:
        ws = None
        try:
            ws = websocket.WebSocketApp(socket, on_open=on_open)
            ws.on_message = lambda w=ws, msg=None: on_message(w, msg)
            print('Connecting...')
            ws.run_forever()
            print('Keyboard interrupt, stopping')
            break
        except ChecksumException:
            ws.close()
            print('Checksum error, closed')
            # no break here, so the loop continues and will reconnect


if __name__ == '__main__':
    main()

  • Related