I want to stream real-time data of the order book of BTC/USD
using a WebSocket connection to the FTX Exchange. After the first snapshot of the order book, the WebSocket returns updates that I apply to my local reconstructed order book. To ensure that my order book is synchronized, I have to check a crc32 integer after every update. If this number matches the checksum in the message, I can be confident that my order book is well synchronized. However, sometimes the checksum is not right, and I need to reset the connection, i.e. unsubscribe to the channel and subscribe right after.
At the end of the on_message
function, I check if the checksum is successful. If it is not, I close the connection but I would like to clear the memory (i.e. the global objects asks, bids, checksum
) and reconnect right after. How can I do it?
Should I just add a while True
loop at the end? Like this one?
while True:
ws.run_forever()
I don't like this while True
solution because it's impossible for me to stop it, I have to quit the terminal.
My code is the following
import websocket,json
import zlib
from decimal import Decimal
import binascii
from itertools import chain, zip_longest
from typing import Iterable, Sequence
asks = {}
bids = {}
checksum = {'checksum':0}
def format_e(dec):
return ('{:.' str(len(dec.as_tuple().digits) - 1) 'e}').format(dec)
def check_sum(
asks: Iterable[Sequence[float]], bids: Iterable[Sequence[float]]
) -> int:
asks=[[level[0],level[1]]for level in asks.items()]
bids=[[level[0],level[1]]for level in bids.items()]
order_book_hash_iterator = zip_longest(bids, asks, fillvalue=tuple())
check_string = ":".join(
(
str(token)
for ask_level, bid_level in order_book_hash_iterator
for token in chain(ask_level, bid_level)
)
)
return binascii.crc32(check_string.encode("ascii"))
def on_open(ws):
print('Opened connection')
asks.clear()
bids.clear()
subscribe_message = {'op': 'subscribe', 'channel': 'orderbook','market':'BTC/USD'}
ws.send(json.dumps(subscribe_message))
def on_message(ws,message):
js=json.loads(message)
if js['type'] == 'partial':
print('Get Snapshot')
for level in js['data']['asks']:
asks[level[0]]=level[1]
for level in js['data']['bids']:
bids[level[0]]=level[1]
checksum['checksum']=js['data']['checksum']
if js['type'] == 'update':
for level in js['data']['asks']:
if level[1]==0:
asks.pop(level[0])
else:
asks[level[0]]=level[1]
for level in js['data']['bids']:
if level[1]==0:
bids.pop(level[0])
else:
bids[level[0]]=level[1]
if check_sum(asks,bids) != js['data']['checksum']:
print('Error')
ws.close()
socket = "wss://ftx.com/ws/"
ws = websocket.WebSocketApp(socket,on_open=on_open)
ws.on_message = lambda ws,msg: on_message(ws,msg)
ws.run_forever()
CodePudding user response:
How about this:
# everything up to and including `on_open` the same
class ChecksumException(Exception):
pass
def on_message(ws, message):
print(message)
js = json.loads(message)
if js['type'] == 'partial':
print('Get Snapshot')
for level in js['data']['asks']:
asks[level[0]] = level[1]
for level in js['data']['bids']:
bids[level[0]] = level[1]
checksum['checksum'] = js['data']['checksum']
if js['type'] == 'update':
for level in js['data']['asks']:
if level[1] == 0:
asks.pop(level[0])
else:
asks[level[0]] = level[1]
for level in js['data']['bids']:
if level[1] == 0:
bids.pop(level[0])
else:
bids[level[0]] = level[1]
if js['type'] == 'subscribed':
return
# so, checking this for *any* type of message, except 'subscribed'
if check_sum(asks, bids) != js['data']['checksum']:
raise ChecksumException
def main():
socket = "wss://ftx.com/ws/"
while True:
ws = None
try:
ws = websocket.WebSocketApp(socket, on_open=on_open)
ws.on_message = lambda w=ws, msg=None: on_message(w, msg)
print('Connecting...')
ws.run_forever()
print('Keyboard interrupt, stopping')
break
except ChecksumException:
ws.close()
print('Checksum error, closed')
# no break here, so the loop continues and will reconnect
if __name__ == '__main__':
main()