I'm trying to stream btc kline data from a web api, and then doing data calculation for every interval of data stream, which is around 2 seconds. The data calculation takes longer than 2 seconds, so I need to implement multiprocessing to spawn workers for the calculation. I'm new to programing and this is way above my programing knowledge, so any help would be appreciated.
currently the code looks something like this:
import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os
class BinanceKlineStream(): # kline subscription
def __init__(self, binance):
self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
self.binance = binance
def thread_stream(self):
worker_thread = threading.Thread(target=self.stream)
worker_thread.start()
def stream(self):
self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
self.ws.run_forever(ping_interval=60)
def on_message(self, ws, message):
json_message = json.loads(message)
self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
print('process1: ', os.getpid())
# spawn a new process when on_message is called
worker_process = multiprocessing.Process(target=type(self).get_kline_combined)
worker_process.start()
worker_process.join()
def on_error(self, ws, error):
print(error)
@staticmethod
def get_kline_combined():
print('process2: ', os.getpid())
time.sleep(5)
if __name__ == "__main__":
# start streaming kline information
binance_spot = ccxt.binance()
stream = BinanceKlineStream(binance_spot)
stream.thread_stream()
error message:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\Ted Teng\AppData\Local\Programs\Python\Python310\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "C:\Users\Ted Teng\AppData\Local\Programs\Python\Python310\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'BinanceKlineStream.get_kline_combined' on <module '__main__' (built-in)>
CodePudding user response:
Answering your original question, sockets are not picklable, which is why you if you are storing it as an instance attribute you cannot pass an instance method as the target function. Therefore, you should change get_kline_combined
to a staticmethod.
Now to fix the AttributeError
, you need to wait to join the server thread you spawned:
import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os
class BinanceKlineStream(): # kline subscription
def __init__(self, binance):
self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
self.binance = binance
def thread_stream(self):
worker_thread = threading.Thread(target=self.stream)
worker_thread.start()
# Notice
worker_thread.join() # Notice
# Notice
def stream(self):
self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
self.ws.run_forever(ping_interval=60)
def on_message(self, ws, message):
json_message = json.loads(message)
self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
print('process1: ', os.getpid())
# spawn a new process when on_message is called
worker_process = multiprocessing.Process(target=type(self).get_kline_combined)
worker_process.start()
worker_process.join()
def on_error(self, ws, error):
print(error)
@staticmethod
def get_kline_combined():
print('process2: ', os.getpid())
time.sleep(5)
if __name__ == "__main__":
# start streaming kline information
binance_spot = ccxt.binance()
stream = BinanceKlineStream(binance_spot)
stream.thread_stream()
One more thing, inside get_kline_combined
you are altering an instance attribute:
self.kline_current = ...
Just keep in mind that any changes to the instance attributes are local to the process. So these won't be reflected in the main process unless you use shared memory.