Home > Software design >  Using multiprocessing and multithreading together to stream data in python
Using multiprocessing and multithreading together to stream data in python

Time:08-18

I'm trying to stream btc kline data from a web api, and then doing data calculation for every interval of data stream, which is around 2 seconds. The data calculation takes longer than 2 seconds, so I need to implement multiprocessing to spawn workers for the calculation. I'm new to programing and this is way above my programing knowledge, so any help would be appreciated.

currently the code looks something like this:

import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os

class BinanceKlineStream():  # kline subscription
    def __init__(self, binance):
        self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
        self.binance = binance

    def thread_stream(self):
        worker_thread = threading.Thread(target=self.stream)
        worker_thread.start()

    def stream(self):
        self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
        self.ws.run_forever(ping_interval=60)

    def on_message(self, ws, message):
        json_message = json.loads(message)
        self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
        print('process1: ', os.getpid())

        # spawn a new process when on_message is called
        worker_process = multiprocessing.Process(target=type(self).get_kline_combined)
        worker_process.start()
        worker_process.join()

    def on_error(self, ws, error):
        print(error)

    @staticmethod
    def get_kline_combined():
        print('process2: ', os.getpid())
        time.sleep(5)


if __name__ == "__main__":
    # start streaming kline information
    binance_spot = ccxt.binance()
    stream = BinanceKlineStream(binance_spot)
    stream.thread_stream()

error message:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\Ted Teng\AppData\Local\Programs\Python\Python310\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\Ted Teng\AppData\Local\Programs\Python\Python310\lib\multiprocessing\spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'BinanceKlineStream.get_kline_combined' on <module '__main__' (built-in)>

CodePudding user response:

Answering your original question, sockets are not picklable, which is why you if you are storing it as an instance attribute you cannot pass an instance method as the target function. Therefore, you should change get_kline_combined to a staticmethod.

Now to fix the AttributeError, you need to wait to join the server thread you spawned:

import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os

class BinanceKlineStream():  # kline subscription
    def __init__(self, binance):
        self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
        self.binance = binance

    def thread_stream(self):
        worker_thread = threading.Thread(target=self.stream)
        worker_thread.start()
        # Notice
        worker_thread.join() # Notice
        # Notice

    def stream(self):
        self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
        self.ws.run_forever(ping_interval=60)

    def on_message(self, ws, message):
        json_message = json.loads(message)
        self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
        print('process1: ', os.getpid())

        # spawn a new process when on_message is called
        worker_process = multiprocessing.Process(target=type(self).get_kline_combined)
        worker_process.start()
        worker_process.join()

    def on_error(self, ws, error):
        print(error)

    @staticmethod
    def get_kline_combined():
        print('process2: ', os.getpid())
        time.sleep(5)


if __name__ == "__main__":
    # start streaming kline information
    binance_spot = ccxt.binance()
    stream = BinanceKlineStream(binance_spot)
    stream.thread_stream()

One more thing, inside get_kline_combined you are altering an instance attribute:

self.kline_current = ...

Just keep in mind that any changes to the instance attributes are local to the process. So these won't be reflected in the main process unless you use shared memory.

Update

You don't need to join processes there and then, instead add them to list and then you can iterate over the list to terminate, join, whatever at your own leisure. I would also recommend you to use a Pool instead, since if there are many messages incoming you'll spawn quite a lot of processes which might not be very efficient:

import websocket
import json
import pandas as pd
import threading
import multiprocessing
import ccxt
import time
import os


class BinanceKlineStream():  # kline subscription
    def __init__(self, binance):
        self.kline_sub = 'wss://stream.binance.com:9443/ws/btcusdt@kline_1m'
        self.binance = binance
        self.pool = multiprocessing.Pool(processes=8)
        self.results = []

    def thread_stream(self):
        worker_thread = threading.Thread(target=self.stream)
        worker_thread.start()
        # Notice
        worker_thread.join() # Notice
        # Notice

    def stream(self):
        self.ws = websocket.WebSocketApp(self.kline_sub, on_message=self.on_message, on_error=self.on_error)
        self.ws.run_forever(ping_interval=60)

    def on_message(self, ws, message):
        json_message = json.loads(message)
        self.kline_current = pd.DataFrame([[json_message['E'], json_message['k']['o'], json_message['k']['h'], json_message['k']['l'], json_message['k']['c'], json_message['k']['v'], json_message['k']['x'], json_message['k']['t'], json_message['k']['T']]], columns=['id', 'open', 'high', 'low', 'close', 'volume', 'kline_closed', 'start_time', 'close_time'])
        print('process1: ', os.getpid())

        # spawn a new process when on_message is called
        worker_process = self.pool.apply_async(type(self).get_kline_combined)
        self.results.append(worker_process)

    def on_error(self, ws, error):
        print(error)

    @staticmethod
    def get_kline_combined():
        print('process2: ', os.getpid())
        time.sleep(5)


if __name__ == "__main__":
    # start streaming kline information
    binance_spot = ccxt.binance()
    stream = BinanceKlineStream(binance_spot)
    stream.thread_stream()

If you use a pool, you can get the results of the task using self.results[index].get(). You should also close the pool after you are done. But whichever way you want to go, you may also periodically want to iterate over self.results and remove tasks in their if they are done. If you don't want to do this in the main thread, then you can spawn another which continuously iterates over the list and does exactly that.

  • Related