Home > Software design >  How to reduce/iterate multiple java streams of same size at once?
How to reduce/iterate multiple java streams of same size at once?

Time:06-04

I have multiple Java streams of the same length, and I want to perform some operation on the corresponding elements of each stream. For e.g., add 1st elements of all stream, 2nd elements of all streams and 3rd elements of all streams and so on.

How can we do this without first reducing each of the streams?

for Minimum reproducible example, I have the following test snippet

@Test
void aggregateMultipleStreams() {
    Stream<Integer> s1 = Stream.of(1, 2);
    Stream<Integer> s2 = Stream.of(4, 5);
    Stream<Integer> s3 = Stream.of(7, 8);
    assertEquals(List.of(1   4   7, 2   5   8), aggregate(s1, s2, s3, 2));
}

I can write the aggregate method as follows, by reducing all the streams first.

private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {
    final List<List<Integer>> reduced = Stream.of(s1, s2, s3)
            .map(s -> s.collect(Collectors.toList())).collect(Collectors.toList());
    return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, reduced.size())
            .map(v -> reduced.get(v).get(n)).sum()).collect(Collectors.toList());
}

But this could be a storage hassle, if each stream contains numerous records, for N records, we need 3N storage here.

can we accomplish the addition of corresponding elements in different streams without first reduction? can we reduce multiple streams at once in Java?

After implementing @jb_dk's solution below, the solution code snippet became:

    private List<Integer> aggregate(Stream<Integer> s1, Stream<Integer> s2, Stream<Integer> s3, int streamSize) {

    final List<Iterator<Integer>> iterators = Stream.of(s1, s2, s3)
            .map(Stream::iterator).collect(Collectors.toList());
    return IntStream.range(0, streamSize).mapToObj(n -> IntStream.range(0, iterators.size())
            .map(v -> iterators.get(v).next()).sum()).collect(Collectors.toList());
}

CodePudding user response:

Form an array of the input stream objects instead of 3 named variables, then create an output List or stream and use an outer loop over the stream length and an inner loop that iterates over the array of input streams and reads one element from each, adding to the output array elements.

Something like (code not tested, syntax errors may exist)

    ...
    {
        List<Integer> results; // NOT final, can be a stream builder instead
        final List<Stream<Integer>> instrms = Stream.Of(s1, s2, s3);
        Iterator<Integer>[] initers = new Iterator<Integer>[instrms.length]
        // Get the iterator (not rewindable) for each stream
        int i = 0;
        for (Stream<Integer> instrm : instrms) {
            initers[i  ] = ((Iterator<Integer>)instrm::iterator);
        }
        // Actually loop over the stream elements, outputting one
        //    sum element for each element.  Assumes all input streams
        //    are same length as the first one.
        while(! initers[0].hasNext()) {
            Integer    res1 = 0;
            for (Iterator<Integer> initer : initers) {
                res1  = initer.next();
            }
            results.Add(res1);
        }
        return results;  // results.build() if a stream builder
    }

CodePudding user response:

This is basically a zip operation, where each stream's element in position i is summed with its corresponding i-th element in another stream.

You could achieve what you need by returning from each stream an Iterator and build a resulting Spliterator which zips every i-th element returned by the streams' iterators.

Basically, the resulting Spliterator can be built on a custom Iterator which simply iterates and sums every next() element returned by the streams' Iterators.

Here is an implementation:

public static <T> Stream<T> zipStreams(BiFunction<? super T, ? super T, ? extends T> funZip, Stream<? extends T>... vetStreams) {
    if (vetStreams == null || vetStreams.length == 0) {
        return null;
    }

    //Creating a List of Spliterator for each given stream
    List<Spliterator<? extends T>> listSliterators = new ArrayList();
    for (Stream<? extends T> s : vetStreams) {
        listSliterators.add(s.spliterator());
    }

    // Creating a final Spliterator built on the Spliterators of every stream.
    // The final Spliterator will be implemented with a custom Iterator which basically iterates every stream's iterator and "sum" their elements.
    // The sum is actually performed via the given function funZip to keep the data type generic.

    //Retrieving the common characteristics from the streams' spliterators
    int commonCharacteristics = listSliterators.get(0).characteristics();
    for (Spliterator spliterator : listSliterators) {
        commonCharacteristics = commonCharacteristics & spliterator.characteristics();
    }
    //zipping two streams also loses the distinct and sorted properties
    commonCharacteristics = commonCharacteristics & ~(Spliterator.DISTINCT | Spliterator.SORTED);

    //Retrieving the common minimum size in case streams of different lengths have been passed.
    //This parameter is necessary to instantiate the final Spliterator and create the resulting stream.
    long commonSize = -1;
    if ((commonCharacteristics & Spliterator.SIZED) != 0) {
        commonSize = listSliterators.stream().map(s -> s.getExactSizeIfKnown()).min(Comparator.naturalOrder()).orElse(-1L);
    }

    //Creating a list of iterators from the Spliterators created from the given streams
    List<Iterator<? extends T>> listIterators = new ArrayList<>();
    for (Spliterator spliterator : listSliterators) {
        listIterators.add(Spliterators.iterator(spliterator));
    }

    //Creating a result iterator built on the streams' iterators
    Iterator<? extends T> resIterator = new Iterator<>() {
        @Override
        public boolean hasNext() {
            //If any iterator has not a hasNext() then false is returned; otherwise true
            return listIterators.stream().anyMatch(i -> !i.hasNext()) ? false : true;
        }

        @Override
        public T next() {
            //Summing every Iterator's next() element
            T n = listIterators.get(0).next();
            for (int i = 1; i < listIterators.size(); i  ) {
                n = funZip.apply(n, listIterators.get(i).next());
            }
            return n;
        }
    };

    //The Spliterator is created as parallel only if every given stream is parallel
    boolean isAnyStreamParallel = Arrays.stream(vetStreams).anyMatch(s -> !s.isParallel()) ? false : true;

    //Returning a stream built from a spliterator which is in turn built on the resulting iterator zipping every given streams' element
    return StreamSupport.stream(Spliterators.spliterator(resIterator, commonSize, commonCharacteristics), isAnyStreamParallel);
}

Output

12
15

Here there is also a link to test the code with different data types:

https://www.jdoodle.com/iembed/v0/rQr

  • Related