I have multiple Java streams of the same length, and I want to perform some operation on the corresponding elements of each stream. For e.g., add 1st elements of all stream, 2nd elements of all streams and 3rd elements of all streams and so on.
How can we do this without first reducing each of the streams?
for Minimum reproducible example, I have the following test snippet
@Test
void aggregateMultipleStreams() {
Stream<Integer> s1 = Stream.of(1, 2);
Stream<Integer> s2 = Stream.of(4, 5);
Stream<Integer> s3 = Stream.of(7, 8);
assertEquals(List.of(1 4 7, 2 5 8), aggregate(s1, s2, s3, 2));
}
I can write the aggregate
method as follows, by reducing all the streams first.
private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
final List<List<Integer>> reduced = Stream.of(s1, s2, s3)
.map(s -> s.collect(Collectors.toList())).collect(Collectors.toList());
return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, reduced.size())
.map(v -> reduced.get(v).get(n)).sum()).collect(Collectors.toList());
}
But this could be a storage hassle, if each stream contains numerous records, for N records, we need 3N storage here.
can we accomplish the addition of corresponding elements in different streams without first reduction? can we reduce multiple streams at once in Java?
After implementing @jb_dk's solution below, the solution code snippet became:
private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
final List<Iterator<Integer>> iterators = Stream.of(s1, s2, s3)
.map(Stream::iterator).collect(Collectors.toList());
return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, iterators.size())
.map(v -> iterators.get(v).next()).sum()).collect(Collectors.toList());
}
CodePudding user response:
Form an array of the input stream objects instead of 3 named variables, then create an output List or stream and use an outer loop over the stream length and an inner loop that iterates over the array of input streams and reads one element from each, adding to the output array elements.
Something like (code not tested, syntax errors may exist)
...
{
List<Integer> results; // NOT final, can be a stream builder instead
final List<Stream<Integer>> instrms = Stream.Of(s1, s2, s3);
Iterator<Integer>[] initers = new Iterator<Integer>[instrms.length]
// Get the iterator (not rewindable) for each stream
int i = 0;
for (Stream<Integer> instrm : instrms) {
initers[i ] = ((Iterator<Integer>)instrm::iterator);
}
// Actually loop over the stream elements, outputting one
// sum element for each element. Assumes all input streams
// are same length as the first one.
while(! initers[0].hasNext()) {
Integer res1 = 0;
for (Iterator<Integer> initer : initers) {
res1 = initer.next();
}
results.Add(res1);
}
return results; // results.build() if a stream builder
}
CodePudding user response:
This is basically a zip operation, where each stream's element in position i
is summed with its corresponding i-th element in another stream.
You could achieve what you need by returning from each stream an Iterator
and build a resulting Spliterator
which zips every i-th element returned by the streams' iterators.
Basically, the resulting Spliterator
can be built on a custom Iterator
which simply iterates and sums every next() element returned by the streams' Iterators
.
Here is an implementation:
public static <T> Stream<T> zipStreams(BiFunction<? super T, ? super T, ? extends T> funZip, Stream<? extends T>... vetStreams) {
if (vetStreams == null || vetStreams.length == 0) {
return null;
}
//Creating a List of Spliterator for each given stream
List<Spliterator<? extends T>> listSliterators = new ArrayList();
for (Stream<? extends T> s : vetStreams) {
listSliterators.add(s.spliterator());
}
// Creating a final Spliterator built on the Spliterators of every stream.
// The final Spliterator will be implemented with a custom Iterator which basically iterates every stream's iterator and "sum" their elements.
// The sum is actually performed via the given function funZip to keep the data type generic.
//Retrieving the common characteristics from the streams' spliterators
int commonCharacteristics = listSliterators.get(0).characteristics();
for (Spliterator spliterator : listSliterators) {
commonCharacteristics = commonCharacteristics & spliterator.characteristics();
}
//zipping two streams also loses the distinct and sorted properties
commonCharacteristics = commonCharacteristics & ~(Spliterator.DISTINCT | Spliterator.SORTED);
//Retrieving the common minimum size in case streams of different lengths have been passed.
//This parameter is necessary to instantiate the final Spliterator and create the resulting stream.
long commonSize = -1;
if ((commonCharacteristics & Spliterator.SIZED) != 0) {
commonSize = listSliterators.stream().map(s -> s.getExactSizeIfKnown()).min(Comparator.naturalOrder()).orElse(-1L);
}
//Creating a list of iterators from the Spliterators created from the given streams
List<Iterator<? extends T>> listIterators = new ArrayList<>();
for (Spliterator spliterator : listSliterators) {
listIterators.add(Spliterators.iterator(spliterator));
}
//Creating a result iterator built on the streams' iterators
Iterator<? extends T> resIterator = new Iterator<>() {
@Override
public boolean hasNext() {
//If any iterator has not a hasNext() then false is returned; otherwise true
return listIterators.stream().anyMatch(i -> !i.hasNext()) ? false : true;
}
@Override
public T next() {
//Summing every Iterator's next() element
T n = listIterators.get(0).next();
for (int i = 1; i < listIterators.size(); i ) {
n = funZip.apply(n, listIterators.get(i).next());
}
return n;
}
};
//The Spliterator is created as parallel only if every given stream is parallel
boolean isAnyStreamParallel = Arrays.stream(vetStreams).anyMatch(s -> !s.isParallel()) ? false : true;
//Returning a stream built from a spliterator which is in turn built on the resulting iterator zipping every given streams' element
return StreamSupport.stream(Spliterators.spliterator(resIterator, commonSize, commonCharacteristics), isAnyStreamParallel);
}
Output
12
15
Here there is also a link to test the code with different data types: