Home > Enterprise >  Binance Multithread Sockets - functions not called concurrently
Binance Multithread Sockets - functions not called concurrently

Time:09-17

I have a code to receive data from binance, about current prices:

import asyncio
from binance import AsyncClient, BinanceSocketManager
import time
from datetime import datetime


def analyze(res):
    kline = res['k']

    if kline['x']: #candle is compleated
        print('{} start_sleeping {} {}'.format(
            datetime.now(),
            kline['s'],
            datetime.fromtimestamp(kline['t'] / 1000),
        ))
        time.sleep(5)
        print('{} finish_sleeping {}'.format(datetime.now(), kline['s']))


async def open_binance_stream(symbol):
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    ts = bm.kline_socket(symbol)
    async with ts as tscm:
        while True:
            res = await tscm.recv()
            analyze(res)

    await client.close_connection()


async def main():
    t1 = asyncio.create_task(open_binance_stream('ETHBTC'))
    t2 = asyncio.create_task(open_binance_stream('XRPBTC'))
    await asyncio.gather(*[t1, t2])


if __name__ == "__main__":
    asyncio.run(main())

How to make analyze function to be called concurently. Binance sends info in the same time with both streams data (ETHBTC and XRPBTC)

But function analyze will be called only once previous analyze (sleep) is completed.

enter image description here

I wish function analyze is called immediately and independently.

CodePudding user response:

Have you tried to put analyze in a thread. I think it will achieve what you want.

import asyncio
from binance import AsyncClient, BinanceSocketManager
import time
from datetime import datetime
from threading import Thread

def analyze(res):
    kline = res['k']

    if kline['x']: #candle is compleated
        print('{} start_sleeping {} {}'.format(
            datetime.now(),
            kline['s'],
            datetime.fromtimestamp(kline['t'] / 1000),
        ))
        time.sleep(5)
        print('{} finish_sleeping {}'.format(datetime.now(), kline['s']))


async def open_binance_stream(symbol):
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    ts = bm.kline_socket(symbol)
    async with ts as tscm:
        while True:
            res = await tscm.recv()
            Thread(target= analyze, args = (res)).start()

    await client.close_connection()


async def main():
    t1 = asyncio.create_task(open_binance_stream('ETHBTC'))
    t2 = asyncio.create_task(open_binance_stream('XRPBTC'))
    await asyncio.gather(*[t1, t2])


if __name__ == "__main__":
    asyncio.run(main())

This should work as expected.

  • Related