source1 emits A,B,C,D etc and never completes
source2 emit 1,2 and completes
I want to merge to A1, B2, C1, D2 etc
update
my initial attemp was to Zip
and Repeat
as suggested from Theodor
however this creates a lock cause source2 generation is expensive.
Last comment from Enigmativity
addresses that problem
source1.Zip(source2.ToEnumerable().ToArray().Repeat())
CodePudding user response:
Since you want to repeat source2
indefinitely and you say that it is cold (in the sense that it produces the same set of values each time, and generally in the same sort of cadence) and it is expensive, we want to turn the IObservable<T>
into a T[]
to ensure it's computed once and only once.
var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));
CodePudding user response:
Assuming that the desirable marble diagram is like this:
Source1: --------A-------B-------C--------D-------|
Source2: ----1--------------2--------|
Merged: --------A1---------B2-------C1---D2------|
Here is a ZipWithRepeated
operator having this behavior:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return second.Replay(replayed => first.ToAsyncEnumerable()
.Zip(replayed.ToAsyncEnumerable().Repeat())
.ToObservable());
}
Usage example:
var merged = source1.ZipWithRepeated(source2);
This solution requires a dependency to the System.Linq.Async and System.Interactive.Async packages, because both sequences are converted to IAsyncEnumerable<T>
s before the zipping.