Home > Blockchain >  Creating an IObservable<T> that returns properly a ton of data (asynchronously?)
Creating an IObservable<T> that returns properly a ton of data (asynchronously?)

Time:10-31

I am not all that familiar with IObservable<T>, but a package I am using forced it on me in a way.

I am expected to return an IObservable<T> on which at a later point Subscribe is called, and then the result of that gets disposed immediately after. And my intent is to fill it with data from a massive dataset. It reads line by line from text files, for GB's worth of data.

But I can't seem to find a good example of how I can make my hour long while loop, into an observable in a way that it doesn't expect all data to be read up front. I see that there are some variants of Observable.FromAsync, but Tasks aren't my strong suit either, and can't seem to get that to work either.

The best I managed so far is below. Which compiles and runs, but does absolutely nothing. Never even calls the code inside the Create as far as I can tell.

public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
    return Observable.Create<Data>(async (subject, token) =>
    {
        try
        {
            FileStream stream = null;
            StreamReader sr = null;
            DateTime date = startDate;
            string path = string.Format("C:\\MYPATH\\{0}-{1}-{2}.csv", date.Year,
                date.Month.ToString("00"), date.Day.ToString("00"));
            while (date < endDate)
            {
                if (!File.Exists(path))
                {
                    date.AddDays(1);
                    continue;
                }
                stream = File.Open(path, FileMode.Create, FileAccess.Read);
                sr = new StreamReader(stream);
                while (!sr.EndOfStream)
                {
                    string line = await sr.ReadLineAsync();
                    Data d = ParseData(line);
                    subject.OnNext(d);
                }
                if (stream != null)
                {
                    sr.Close();
                    stream.Close();
                }
            }
        }
        catch (Exception ex)
        {
            try
            {
                subject.OnError(ex);
            }
            catch (Exception)
            {
                Console.WriteLine("An exception was thrown while trying to call"  
                    " one rror on the observable subject -- means you're not"  
                    " catching exceptions");
                throw;
            }
        }
    }).Publish();
}

I am not even sure if what I want to do is even technically possible, as I am not sure how the Observable pattern works. But due to the context it seems to expect a server connection feeding it a DataStream normally. So I assume it can be done. But only with the right combination of Observable creation methods.

If anyone has some good documentation to read up on that explains this in a starter friendly way, that would be nice as well.

As requested, how the method is called below, but it mostly goes into a blackbox library.

IObservable<Data> data = GetHistoricalData(
    new DateTime(2021, 1, 1, 0, 0, 0, DateTimeKind.Utc),
    new DateTime(2021, 1, 5, 0, 0, 0, DateTimeKind.Utc));

// Build charts from data
IObservable<(Data, Chart)> dataWithChart = data.GenerateCharts(TimeFrame);

// Generate signals from the charts
IObservable<(Signal, Chart)> signalsWithChart = dataWithChart.GenerateSignals(
    Symbol, strategy);

// We're interested in the signals only
IObservable<Signal> signals = signalsWithChart.SelectSignals();

// Show information from each signal
IDisposable subscription = signals.Subscribe(ShowSignal);
subscription.Dispose();

CodePudding user response:

I think you should read "Introduction to Rx", especially the section about hot and cold observables, and publish and connect.

There are several smaller issues with your code. I made a simpler version of your code, and it worked after I removed the call to .Publish(). And I'm fairly certain you don't want that here.

Publish makes a wrapper that supports multiple observers. You can make it work for multiple observers by using Publish(), and then calling Connect once all observers are subscribed. But publishing works better for "hot" streams, like mouse/keyboard events, for example. Your data is only read once. Any observable that connects after you call Connect will not get the data that has already been read. If you need multiple subscribers like this, I would return IConnectableObservable instead of IObservable, so you can Connect() to it once all observers are subscribed.

As for your code:

  • Keep it simple. Use using whenever you work with a stream, unless you know really well why you shouldn't.
  • Calculate the path inside the loop. Use string interpolation instead of string.Format().
  • Call subject.OnCompleted() at the end.
  • Re-assign the date variable. DateTime values in c# are immutable. date.AddDays() does not modify date, but returns a new DateTime.
  • Use the FileMode you actually want. You don't want .Create if you don't intend to call the code if the file doesn't exist.

This works for me:

public IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
    return Observable.Create<Data>(async (subject, token) =>
    {
        var date = startDate;
        try
        {
            while (date < endDate)
            {
                string path = $@"C:\MYPATH\{date.Year}-{date.Month:00}-{date.Day:00}.csv";

                if (File.Exists(path))
                {
                    using (var stream = File.Open(path, FileMode.Open, FileAccess.Read))
                    using (var sr = new StreamReader(stream))
                    {
                        while (!sr.EndOfStream)
                        {
                            var line = await sr.ReadLineAsync();
                            if (!string.IsNullOrWhiteSpace(line))
                            {
                                var data = ParseData(line);
                                subject.OnNext(data);
                            }
                        }
                    }
                }

                date = date.AddDays(1);
            }
        } catch (Exception e)
        {
            subject.OnError(e);
        }
        
        subject.OnCompleted();
    });
}

CodePudding user response:

It seems that your code is consumed by a package that misuses the IObservable<T> monad, by treating it as an IEnumerable<T>. When someone Subscribes to an observable sequence and then immediately unsubscribes, they are only going to observe the notifications pushed synchronously during the subscription. Which is equivalent to enumerating synchronously an IEnumerable<T>. So my suggestion is to simplify your life by writing IEnumerable<T>-based code, and then pass the sequence to the package after converting it to observable with the ToObservable Rx operator:

public static IEnumerable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
    //...
    foreach (var line in File.ReadLines(path))
    {
        yield return ParseData(line);
    }
    //...
}

Conversion from enumerable to observable:

IObservable<Data> data = GetHistoricalData(date1, date2).ToObservable();

CodePudding user response:

Here you go. One nice query:

public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) =>
    from n in Observable.Range(0, int.MaxValue)
    let d = startDate.AddDays(n)
    where d < endDate
    let path = $"C:\\MYPATH\\{d.ToString("yyyy-MM-dd")}.csv"
    where File.Exists(path)
    from l in Observable.Using(
        () => File.Open(path, FileMode.Open, FileAccess.Read),
        s => Observable.Using(
            () => new StreamReader(s),
            sr => Observable.While(
                () => !sr.EndOfStream,
                Observable.Defer(
                    () => Observable.FromAsync(
                    () => sr.ReadLineAsync())))))
    select ParseData(l);
  • Related