Home > database >  Merge hot source1 with a cold source2
Merge hot source1 with a cold source2

Time:01-02

source1 emits A,B,C,D etc and never completes

source2 emit 1,2 and completes

I want to merge to A1, B2, C1, D2 etc

update

my initial attemp was to Zip and Repeat as suggested from Theodor however this creates a lock cause source2 generation is expensive.

Last comment from Enigmativity addresses that problem

source1.Zip(source2.ToEnumerable().ToArray().Repeat())

CodePudding user response:

Since you want to repeat source2 indefinitely and you say that it is cold (in the sense that it produces the same set of values each time, and generally in the same sort of cadence) and it is expensive, we want to turn the IObservable<T> into a T[] to ensure it's computed once and only once.

var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));

CodePudding user response:

Assuming that the desirable marble diagram is like this:

Source1:  --------A-------B-------C--------D-------|
Source2:  ----1--------------2--------|
Merged:   --------A1---------B2-------C1---D2------|

Here is a ZipWithRepeated operator having this behavior:

static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second)
{
    return second.Replay(replayed => first.ToAsyncEnumerable()
        .Zip(replayed.ToAsyncEnumerable().Repeat())
        .ToObservable());
}

Usage example:

var merged = source1.ZipWithRepeated(source2);

This solution requires a dependency to the System.Linq.Async and System.Interactive.Async packages, because both sequences are converted to IAsyncEnumerable<T>s before the zipping.

  • Related