I'm trying to stream btc kline data from a web api, and then doing data calculation for every interval of data stream, which is around 2 seconds. The data calculation takes longer than 2 seconds, so I need to implement multiprocessing to spawn workers for the calculation. I'm new to programing and this is way above my programing knowledge, so any help would be appreciated.
currently the code looks something like this:
import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os
class BinanceKlineStream(): # kline subscription
def __init__(self, binance):
self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
self.binance = binance
def thread_stream(self):
worker_thread = threading.Thread(target=self.stream)
worker_thread.start()
def stream(self):
self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
self.ws.run_forever(ping_interval=60)
def on_message(self, ws, message):
json_message = json.loads(message)
self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
print('process1: ', os.getpid())
# spawn a new process when on_message is called
worker_process = multiprocessing.Process(target=type(self).get_kline_combined)
worker_process.start()
worker_process.join()
def on_error(self, ws, error):
print(error)
@staticmethod
def get_kline_combined():
print('process2: ', os.getpid())
time.sleep(5)
if __name__ == "__main__":
# start streaming kline information
binance_spot = ccxt.binance()
stream = BinanceKlineStream(binance_spot)
stream.thread_stream()
error message:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\Ted Teng\AppData\Local\Programs\Python\Python310\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "C:\Users\Ted Teng\AppData\Local\Programs\Python\Python310\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'BinanceKlineStream.get_kline_combined' on <module '__main__' (built-in)>
CodePudding user response:
Answering your original question, sockets are not picklable, which is why you if you are storing it as an instance attribute you cannot pass an instance method as the target function. Therefore, you should change get_kline_combined
to a staticmethod.
Now to fix the AttributeError
, you need to wait to join the server thread you spawned:
import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os
class BinanceKlineStream(): # kline subscription
def __init__(self, binance):
self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
self.binance = binance
def thread_stream(self):
worker_thread = threading.Thread(target=self.stream)
worker_thread.start()
# Notice
worker_thread.join() # Notice
# Notice
def stream(self):
self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
self.ws.run_forever(ping_interval=60)
def on_message(self, ws, message):
json_message = json.loads(message)
self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
print('process1: ', os.getpid())
# spawn a new process when on_message is called
worker_process = multiprocessing.Process(target=type(self).get_kline_combined)
worker_process.start()
worker_process.join()
def on_error(self, ws, error):
print(error)
@staticmethod
def get_kline_combined():
print('process2: ', os.getpid())
time.sleep(5)
if __name__ == "__main__":
# start streaming kline information
binance_spot = ccxt.binance()
stream = BinanceKlineStream(binance_spot)
stream.thread_stream()
One more thing, inside get_kline_combined
you are altering an instance attribute:
self.kline_current = ...
Just keep in mind that any changes to the instance attributes are local to the process. So these won't be reflected in the main process unless you use shared memory.
Update
You don't need to join processes there and then, instead add them to list and then you can iterate over the list to terminate, join, whatever at your own leisure. I would also recommend you to use a Pool instead, since if there are many messages incoming you'll spawn quite a lot of processes which might not be very efficient:
import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os
class BinanceKlineStream(): # kline subscription
def __init__(self, binance):
self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
self.binance = binance
self.pool = multiprocessing.Pool(processes=8)
self.results = []
def thread_stream(self):
worker_thread = threading.Thread(target=self.stream)
worker_thread.start()
# Notice
worker_thread.join() # Notice
# Notice
def stream(self):
self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
self.ws.run_forever(ping_interval=60)
def on_message(self, ws, message):
json_message = json.loads(message)
self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
print('process1: ', os.getpid())
# spawn a new process when on_message is called
worker_process = self.pool.apply_async(type(self).get_kline_combined)
self.results.append(worker_process)
def on_error(self, ws, error):
print(error)
@staticmethod
def get_kline_combined():
print('process2: ', os.getpid())
time.sleep(5)
if __name__ == "__main__":
# start streaming kline information
binance_spot = ccxt.binance()
stream = BinanceKlineStream(binance_spot)
stream.thread_stream()
If you use a pool, you can get the results of the task using self.results[index].get()
. You should also close the pool after you are done. But whichever way you want to go, you may also periodically want to iterate over self.results
and remove tasks in their if they are done. If you don't want to do this in the main thread, then you can spawn another which continuously iterates over the list and does exactly that.