Home > Software design >  Reactive.NET - returning generic object out of .Subscribe
Reactive.NET - returning generic object out of .Subscribe

Time:11-29

There is a combined web socket stream wss://stream.binance.com:9443/stream?streams=bnbusdt@ticker/dogeusdt@depth5 and I need the following output:

public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;

Log

Connection opened
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272226,"bids":[["0.20140000","21189.00000000"],["0.20130000","275878.00000000"],["0.20120000","290900.00000000"],["0.20110000","313592.00000000"],["0.20100000","367368.00000000"]],"asks":[["0.20150000","109090.00000000"],["0.20160000","404515.00000000"],["0.20170000","649409.00000000"],["0.20180000","360650.00000000"],["0.20190000","185381.00000000"]]}}
Message: {"stream":"bnbusdt@ticker","data":{"e":"24hrTicker","E":1638097890123,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07225116","x":"601.40000000","c":"599.00000000","Q":"0.45200000","b":"599.00000000","B":"122.06600000","a":"599.10000000","A":"0.54000000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286613.77200000","q":"769487994.99120000","O":1638011490067,"C":1638097890067,"F":471394573,"L":472263211,"n":868639}}
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272244,"bids":[["0.20140000","21189.00000000"],["0.20130000","273472.00000000"],["0.20120000","262491.00000000"],["0.20110000","350795.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","129653.00000000"],["0.20160000","411961.00000000"],["0.20170000","634098.00000000"],["0.20180000","360650.00000000"],["0.20190000","194995.00000000"]]}}
Message: {"stream":"bnbusdt@ticker","data":{"e":"24hrTicker","E":1638097891059,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07224947","x":"601.40000000","c":"599.00000000","Q":"0.28800000","b":"599.00000000","B":"116.83300000","a":"599.10000000","A":"35.25500000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286614.33800000","q":"769488331.33030000","O":1638011491059,"C":1638097891059,"F":471394579,"L":472263222,"n":868644}}
Message: {"stream":"dogeusdt@depth5","data":{"lastUpdateId":3740272263,"bids":[["0.20140000","84255.00000000"],["0.20130000","263544.00000000"],["0.20120000","290699.00000000"],["0.20110000","322587.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","128586.00000000"],["0.20160000","422245.00000000"],["0.20170000","629711.00000000"],["0.20180000","365383.00000000"],["0.20190000","194995.00000000"]]}}

Brief description

The stream is returning messages as shown in the preceding log. I need to deserialize the result into a WebSocketResponse<T>, but here is the thing. I need to split the messages somehow or I don't know but the result I except is the following properties: IObservable<WebSocketPriceTicker24Hr> and IObservable<WebSocketDepth>.

public IObservable<string> Messages => Observable
    .FromEventPattern<MessageReceivedEventArgs>(h => _webSocket.MessageReceived  = h,
        h => _webSocket.MessageReceived -= h)
    .Select(e => e.EventArgs.Message);

...

public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;

_eventSubscription = _webSocket.Messages
    .Select(m => // string
    {
        Console.WriteLine($"Message: {m}");

        // TODO: What here?
        //JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(m) ?? throw new ArgumentException(m, nameof(m));
        //JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(m) ?? throw new ArgumentException(m, nameof(m));

        return m;
    })
    .Subscribe((result) => { // IObservable<string>
        // TODO: What here?
    });

...

// Models
public class WebSocketResponse<T>
{
    public string? Stream { get; set; }
    public T? Data { get; set; }
}

public class WebSocketPriceTicker24Hr
{
    [JsonPropertyName("e")] public string? EventType { get; set; }

    [JsonPropertyName("E")] public long EventTime { get; set; }

    [JsonPropertyName("s")] public string? Symbol { get; set; }

    [JsonPropertyName("p")] public decimal PriceChange { get; set; }

    [JsonPropertyName("P")] public decimal PriceChangePercent { get; set; }

    [JsonPropertyName("w")] public decimal WeightedAveragePrice { get; set; }

    [JsonPropertyName("x")] public decimal PreviousClosePrice { get; set; }

    [JsonPropertyName("c")] public decimal LastPrice { get; set; }

    [JsonPropertyName("Q")] public decimal LastQuantity { get; set; }

    [JsonPropertyName("b")] public decimal BestBidPrice { get; set; }

    [JsonPropertyName("B")] public decimal BestBidQuantity { get; set; }

    [JsonPropertyName("a")] public decimal BestAskPrice { get; set; }

    [JsonPropertyName("A")] public decimal BestAskQuantity { get; set; }

    [JsonPropertyName("o")] public decimal OpenPrice { get; set; }

    [JsonPropertyName("h")] public decimal HighPrice { get; set; }

    [JsonPropertyName("l")] public decimal LowPrice { get; set; }

    [JsonPropertyName("v")] public decimal TotalTradedBaseVolume { get; set; }

    [JsonPropertyName("q")] public decimal TotalTradedQuoteVolume { get; set; }

    [JsonPropertyName("O")] public long OpenTime { get; set; }

    [JsonPropertyName("C")] public long CloseTime { get; set; }

    [JsonPropertyName("F")] public long FirstTradeId { get; set; }

    [JsonPropertyName("L")] public long LastTradeId { get; set; }

    [JsonPropertyName("n")] public long Count { get; set; }
}

public class WebSocketDepth
{
    [JsonPropertyName("e")] public string? EventType { get; set; }

    [JsonPropertyName("E")] public long EventTime { get; set; }

    [JsonPropertyName("s")] public string? Symbol { get; set; }

    [JsonPropertyName("U")] public long FirstUpdateId { get; set; }

    [JsonPropertyName("u")] public long FinalUpdateId { get; set; }

    [JsonPropertyName("b")]
    public IEnumerable<IEnumerable<string>> Bids { get; set; } = Array.Empty<IEnumerable<string>>();

    [JsonPropertyName("a")]
    public IEnumerable<IEnumerable<string>> Asks { get; set; } = Array.Empty<IEnumerable<string>>();
}

CodePudding user response:

You can build one observable of type IObservable<object> which contains the WebSocketPriceTicker24Hr and WebSocketDepth objects. After that you use OfType<T>() to build an observable of a specific type.

IObservable<object> afterDeserialize = source.Select<string, object>(it => {
    var ticker = JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(it);
    var depth = JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(it);
    if (ticker != null && ticker.stream == "bnbusdt@ticker") {
        return ticker.data;
    }
    if (depth != null && depth.stream == "dogeusdt@depth5") {
        return depth.data;
    }
    throw new InvalidOperationException("Could not deserialize the JSON to any object");
});

This is one possible way to deserialize/extract the "data" part of the JSON, but these hardcoded stream checks are ugly. The JsonSerializer.Deserialize() call will not return null when the format doesn't match. You have to adjust the deserializing process to be more generic/robust. But for a proof-of-concept, this will return WebSocketPriceTicker24Hr and WebSocketDepth objects.

Now we can use OfType<T>() on this observable.

IObservable<WebSocketDepth> onlyDepths = afterDeserialize
    .OfType<WebSocketDepth>();

IObservable<WebSocketPriceTicker24Hr> onlyTicker = afterDeserialize
    .OfType<WebSocketPriceTicker24Hr>();

From there you can subscribe on the onlyDepths and onlyTicker observables.

  • Related