I am not all that familiar with IObservable, but a package I am using forced it on me in a way.
I am expected to return an IObservable on which at a later point Subscribe is called, and then the result of that gets disposed immediately after. And my intent is to fill it with data from a massive dataset. It reads line by line from text files, for GB's worth of data.
But I can't seem to find a good example of how I can make my hour long while loop, into an observable in a way that it doesn't expect all data to be read up front. I see that there are some variants of Observable.FromAsync, but I Tasks aren't my strong suit either, and can't seem to get that to work either.
The best I managed so far is below. Which compiles and runs, but does absolutely nothing. Never even calls the code inside the Create as far as I can tell.
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
{
return Observable.Create<Data>(async (subject, token) =>
{
try
{
FileStream stream = null;
StreamReader sr = null;
DateTime date = startDate;
string path = string.Format("C:\\MYPATH\\{0}-{1}-{2}.csv", date.Year, date.Month.ToString("00"), date.Day.ToString("00"));
while(date < endDate)
{
if (!File.Exists(path))
{
date.AddDays(1);
continue;
}
stream = File.Open(path, FileMode.Create, FileAccess.Read);
sr = new StreamReader(stream);
while (!sr.EndOfStream)
{
string line = await sr.ReadLineAsync();
Data d = ParseData(line);
subject.OnNext(d);
}
if (stream != null)
{
sr.Close();
stream.Close();
}
}
}
catch (Exception ex)
{
try
{
subject.OnError(ex);
}
catch (Exception)
{
Console.WriteLine("An exception was thrown while trying to call one rror on the observable subject -- means you're not catching exceptions everywhere");
throw;
}
}
}).Publish();
}
I am not even sure if what I want to do is even technically possible, as I am not sure how the Observable pattern works. But due to the context it seems to expect a server connection feeding it a DataStream normally. So I assume it can be done. But only with the right combination of Observable creation methods.
If anyone has some good documentation to read up on that explains this in a starter friendly way, that would be nice as well.
CodePudding user response:
I think you should read "Introduction to Rx", especially the section about hot and cold observables, and publish and connect.
There are several smaller issues with your code.
I made a simpler version of your code, and it worked after I removed the call to .Publish()
. And I'm fairly certain you don't want that here.
Publish
makes a wrapper that supports multiple observers. You can make it work for multiple observers by using Publish()
, and then calling Connect
once all observers are subscribed. But publishing works better for "hot" streams, like mouse/keyboard events, for example. Your data is only read once. Any observable that connects after you call Connect
will not get the data that has already been read. If you need multiple subscribers like this, I would return IConnectableObservable
instead of IObservable
, so you can Connect()
to it once all observers are subscribed.
As for your code:
- Keep it simple. Use
using
whenever you work with a stream, unless you know really well why you shouldn't. - Calculate the path inside the loop. Use string interpolation instead of
string.Format()
. - Call
subject.OnCompleted()
at the end. - Re-assign the
date
variable.DateTime
values in c# are immutable.date.AddDays()
does not modifydate
, but returns a newDateTime
. - Use the FileMode you actually want. You don't want .Create if you don't intend to call the code if the file doesn't exist.
This works for me:
public IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) {
return Observable.Create<Data>(async (subject, token) =>
{
var date = startDate;
while (date < endDate)
{
string path = $@"C:\MYPATH\{date.Year}-{date.Month:00}-{date.Day:00}.csv";
if(!File.Exists(path)) {
date = date.AddDays(1);
continue;
}
try
{
using (var stream = File.Open(path, FileMode.Open, FileAccess.Read))
using (var sr = new StreamReader(stream))
{
while (!sr.EndOfStream)
{
var line = await sr.ReadLineAsync();
if (!string.IsNullOrWhiteSpace(line))
{
var data = ParseData(line);
subject.OnNext(data);
}
}
}
}
catch (Exception e)
{
subject.OnError(e);
}
}
subject.OnCompleted();
});
}
CodePudding user response:
Here you go. One nice query:
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) =>
from n in Observable.Range(0, int.MaxValue)
let d = startDate.AddDays(n)
where d < endDate
let path = $"C:\\MYPATH\\{d.ToString("yyyy-MM-dd")}.csv"
where File.Exists(path)
from l in Observable.Using(
() => File.Open(path, FileMode.Open, FileAccess.Read),
s => Observable.Using(
() => new StreamReader(s),
sr => Observable.While(
() => !sr.EndOfStream,
Observable.Defer(
() => Observable.FromAsync(
() => sr.ReadLineAsync())))))
select ParseData(l);