I'm looking for an alternative of stoppingToken.Register(ShutDown)
where I'm able to call async methods? Basically, _restClient.Spot.UserStream.StopUserStreamAsync
should be awaited. Right now it isn't.
Any other suggestions are appreciated.
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
namespace QSGEngine.Server.Services
{
public class PortfolioService : BackgroundService
{
private const string ApiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
private const string SecretKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
private readonly ILogger<PortfolioService> _logger;
private readonly IBinanceClient _restClient;
private readonly IBinanceSocketClient _socketClient;
public PortfolioService(ILogger<PortfolioService> logger)
{
_logger = logger;
_restClient = new BinanceClient(new BinanceClientOptions
{
ApiCredentials = new ApiCredentials(ApiKey, SecretKey),
AutoTimestamp = true,
AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
TradeRulesBehaviour = TradeRulesBehaviour.AutoComply,
#if DEBUG
LogLevel = LogLevel.Debug,
LogWriters = new List<ILogger> { _logger } // TODO: FIX
#endif
});
_socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
{
ApiCredentials = new ApiCredentials(ApiKey, SecretKey),
AutoReconnect = true,
ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
LogLevel = LogLevel.Debug,
LogWriters = new List<ILogger> { _logger } // TODO: FIX
#endif
});
}
private string? _listenKey;
private CallResult<UpdateSubscription>? _userDataUpdateSubscription;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.Register(ShutDown);
// Create listen key
var listenKeyResult = await _restClient.Spot.UserStream.StartUserStreamAsync(stoppingToken);
if (!listenKeyResult.Success)
{
return;
}
_listenKey = listenKeyResult.Data;
// Subscribe to web socket
_userDataUpdateSubscription = await _socketClient.Spot.SubscribeToUserDataUpdatesAsync(_listenKey,
null,
null,
data =>
{
_logger.LogInformation("ASD {Balances}", data.Data.Balances);
},
data =>
{
_logger.LogInformation("BALANCE DELTA {BalanceDelta}", data.Data.BalanceDelta);
}).ConfigureAwait(false);
if (!_userDataUpdateSubscription.Success)
{
return;
}
_userDataUpdateSubscription.Data.Exception = PortfolioService_Exception;
// Keep the listen key alive
using var keepAlive = Task.Run(async () =>
{
while (true)
{
// Listen key will be alive for 60 minutes
await _restClient.Spot.UserStream.KeepAliveUserStreamAsync(_listenKey, stoppingToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMinutes(50), stoppingToken).ConfigureAwait(false);
}
}, stoppingToken);
}
private void ShutDown()
{
// Unsubscribe from Exception event
if (_userDataUpdateSubscription != null)
{
_userDataUpdateSubscription.Data.Exception -= PortfolioService_Exception;
_userDataUpdateSubscription = null;
}
// Stop listen key
if (_listenKey != null)
{
_restClient.Spot.UserStream.StopUserStreamAsync(_listenKey).GetAwaiter().GetResult();
_listenKey = null;
}
}
private void PortfolioService_Exception(Exception ex)
{
_logger.LogInformation("Exception: {StackTrace}", ex.StackTrace);
}
}
}
CodePudding user response:
- Wrap your shutdown logic in an
IAsyncDisposable
. You can either write your own implementation or use an anonymous disposable likeAsyncDisposable
from myNito.Disposables
library. - Use an
await using
declaration to asynchronously wait for the disposal.
When the background service is shut down, it will raise an OperationCanceledException
. The await using
will ensure the disposal asynchronously completes before propagating that exception.
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Create listen key
var listenKeyResult = await _restClient.Spot.UserStream.StartUserStreamAsync(stoppingToken);
if (!listenKeyResult.Success)
return;
_listenKey = listenKeyResult.Data;
await using var disposeListenKey = new AsyncDisposable(async () => await _restClient.Spot.UserStream.StopUserStreamAsync(_listenKey));
// Subscribe to web socket
_userDataUpdateSubscription = await _socketClient.Spot.SubscribeToUserDataUpdatesAsync(_listenKey,
null,
null,
data =>
{
_logger.LogInformation("ASD {Balances}", data.Data.Balances);
},
data =>
{
_logger.LogInformation("BALANCE DELTA {BalanceDelta}", data.Data.BalanceDelta);
}).ConfigureAwait(false);
if (!_userDataUpdateSubscription.Success)
return;
_userDataUpdateSubscription.Data.Exception = PortfolioService_Exception;
using var disposeExceptionHandler = new Disposable(() =>
_userDataUpdateSubscription.Data.Exception -= PortfolioService_Exception);
// Keep the listen key alive
using var keepAlive = Task.Run(async () =>
{
while (true)
{
// Listen key will be alive for 60 minutes
await _restClient.Spot.UserStream.KeepAliveUserStreamAsync(_listenKey, stoppingToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMinutes(50), stoppingToken).ConfigureAwait(false);
}
}, stoppingToken);
}