Home > Enterprise >  Alternative of CancellationToken.Register for async methods / on BackgroundService closing
Alternative of CancellationToken.Register for async methods / on BackgroundService closing

Time:11-13

I'm looking for an alternative of stoppingToken.Register(ShutDown) where I'm able to call async methods? Basically, _restClient.Spot.UserStream.StopUserStreamAsync should be awaited. Right now it isn't.

Any other suggestions are appreciated.

using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;

namespace QSGEngine.Server.Services
{
    public class PortfolioService : BackgroundService
    {
        private const string ApiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
        private const string SecretKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";

        private readonly ILogger<PortfolioService> _logger;
        private readonly IBinanceClient _restClient;
        private readonly IBinanceSocketClient _socketClient;

        public PortfolioService(ILogger<PortfolioService> logger)
        {
            _logger = logger;

            _restClient = new BinanceClient(new BinanceClientOptions
            {
                ApiCredentials = new ApiCredentials(ApiKey, SecretKey),
                AutoTimestamp = true,
                AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
                TradeRulesBehaviour = TradeRulesBehaviour.AutoComply,
#if DEBUG
                LogLevel = LogLevel.Debug,
                LogWriters = new List<ILogger> { _logger } // TODO: FIX
#endif
            });

            _socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
            {
                ApiCredentials = new ApiCredentials(ApiKey, SecretKey),
                AutoReconnect = true,
                ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
                LogLevel = LogLevel.Debug,
                LogWriters = new List<ILogger> { _logger } // TODO: FIX
#endif
            });
        }

        private string? _listenKey;
        private CallResult<UpdateSubscription>? _userDataUpdateSubscription;
        
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            stoppingToken.Register(ShutDown);
            
            // Create listen key
            var listenKeyResult = await _restClient.Spot.UserStream.StartUserStreamAsync(stoppingToken);
            
            if (!listenKeyResult.Success)
            {
                return;
            }

            _listenKey = listenKeyResult.Data;

            // Subscribe to web socket
            _userDataUpdateSubscription = await _socketClient.Spot.SubscribeToUserDataUpdatesAsync(_listenKey,
                null,
                null,
                data =>
                {
                    _logger.LogInformation("ASD {Balances}", data.Data.Balances);
                },
                data =>
                {
                    _logger.LogInformation("BALANCE DELTA {BalanceDelta}", data.Data.BalanceDelta);
                }).ConfigureAwait(false);

            if (!_userDataUpdateSubscription.Success)
            {
                return;
            }
            
            _userDataUpdateSubscription.Data.Exception  = PortfolioService_Exception;
            
            // Keep the listen key alive
            using var keepAlive = Task.Run(async () =>
            {
                while (true)
                {
                    // Listen key will be alive for 60 minutes
                    await _restClient.Spot.UserStream.KeepAliveUserStreamAsync(_listenKey, stoppingToken).ConfigureAwait(false);
                    await Task.Delay(TimeSpan.FromMinutes(50), stoppingToken).ConfigureAwait(false);
                }
            }, stoppingToken);
        }

        private void ShutDown()
        {
            // Unsubscribe from Exception event
            if (_userDataUpdateSubscription != null)
            {
                _userDataUpdateSubscription.Data.Exception -= PortfolioService_Exception;
                _userDataUpdateSubscription = null;
            }

            // Stop listen key
            if (_listenKey != null)
            {
                _restClient.Spot.UserStream.StopUserStreamAsync(_listenKey).GetAwaiter().GetResult();
                _listenKey = null;
            }
        }

        private void PortfolioService_Exception(Exception ex)
        {
            _logger.LogInformation("Exception: {StackTrace}", ex.StackTrace);
        }
    }
}

CodePudding user response:

  1. Wrap your shutdown logic in an IAsyncDisposable. You can either write your own implementation or use an anonymous disposable like AsyncDisposable from my Nito.Disposables library.
  2. Use an await using declaration to asynchronously wait for the disposal.

When the background service is shut down, it will raise an OperationCanceledException. The await using will ensure the disposal asynchronously completes before propagating that exception.

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  // Create listen key
  var listenKeyResult = await _restClient.Spot.UserStream.StartUserStreamAsync(stoppingToken);
  if (!listenKeyResult.Success)
    return;

  _listenKey = listenKeyResult.Data;
  await using var disposeListenKey = new AsyncDisposable(async () => await _restClient.Spot.UserStream.StopUserStreamAsync(_listenKey));

  // Subscribe to web socket
  _userDataUpdateSubscription = await _socketClient.Spot.SubscribeToUserDataUpdatesAsync(_listenKey,
      null,
      null,
      data =>
      {
        _logger.LogInformation("ASD {Balances}", data.Data.Balances);
      },
      data =>
      {
        _logger.LogInformation("BALANCE DELTA {BalanceDelta}", data.Data.BalanceDelta);
      }).ConfigureAwait(false);
  if (!_userDataUpdateSubscription.Success)
    return;
            
  _userDataUpdateSubscription.Data.Exception  = PortfolioService_Exception;
  using var disposeExceptionHandler = new Disposable(() =>
      _userDataUpdateSubscription.Data.Exception -= PortfolioService_Exception);
            
  // Keep the listen key alive
  using var keepAlive = Task.Run(async () =>
  {
    while (true)
    {
      // Listen key will be alive for 60 minutes
      await _restClient.Spot.UserStream.KeepAliveUserStreamAsync(_listenKey, stoppingToken).ConfigureAwait(false);
      await Task.Delay(TimeSpan.FromMinutes(50), stoppingToken).ConfigureAwait(false);
    }
  }, stoppingToken);
}
  • Related