Home > Blockchain >  How to Peek the Lowest elements from a List of sorted Streams continously
How to Peek the Lowest elements from a List of sorted Streams continously

Time:05-16

I started learning about Java Stream, and I am wondering is it possible to only peek the first element of the stream without retrieving it.

For example, I have multiple streams, and each of them have integers that are sorted in non-decreasing order, and I want to get a sorted list of all integers, so I'm thinking about using a PrioirtyQueue<Stream> sorted in non-decreasing order as well.

However, in order to get the PrioirtyQueue<Stream> to sort the streams, I need to pass in a comparator for stream to compare streams by their first element, and I am not sure how to peek the first element in each stream.

For example, I have the following streams.

[1, 2, 3, 5],
[0, 2, 4, 6]

I want to write a function getNextInteger(), that handles a list of sorted streams.

Every time I call the method, it returns the next smallest integer, so the result might be [0,1,2,2] if I call the method 4 times.

I want to use PriorityQueue to sort the streams by their first value, and retrieve the smallest one and requeue the stream if it is not empty.

CodePudding user response:

Stream is a mean of iteration over the source of data, it's meant to process the data, not to store it.

Therefore, your question is inherently incorrect. The short answer is no.

It's not a data structure, you can't compare it with PriorityQueue or any other collection.

Have a look at the documentation:

Collections and streams, while bearing some superficial similarities, have different goals. Collections are primarily concerned with the efficient management of, and access to, their elements. By contrast, streams do not provide a means to directly access or manipulate their elements, and are instead concerned with declaratively describing their source and the computational operations which will be performed in aggregate on that source.

As I said, stream is a mean of iteration, but stream pipelines is also differs from the Iterator. You can retrieve one, and then another one. Stream pipeline will be either executed or not. It will depend on whether stream has a terminal operation or not.

For instance, this stream is valid, it will compile, but it will not be executed:

Stream.of("a", "b", "c").map(String::toUpperCase);

Because it lacks terminal operation.

Every stream should have a source, a single terminal operation which triggers the execution of the pipeline and produces the result. Intermediate operations like map() and filter(), which are meant to transform the stream, are optional.

You can't obtain the data from the stream without processing it. And once it's processed, it can no longer be used.

As a possible remedy for this problem, you might consider wrapping the stream with an object that will maintain separately the first element from the stream source and the stream itself.

public record StreamWrapper(int first, IntStream stream) {}

That approach can be used, it will be sufficient to compare streams by a single value, which should be extracted from the stream source (if the stream source allows that) at the same time when the stream gets generated.


Update

I want to write a function getNextInteger(), that handles a list of sorted streams.

Every time I call the method, it returns the next smallest integer, so the result might be [0,1,2,2] if I call the method 4 times.

That task in not suitable for streams. Unless you can blind on the fact that the data in every stream is already sorted.

If we combine all the streams in one and apply sorting, it will not cause a gigantic performance hit as it might seem at the beginning. In order to sort the data stream dumps all the elements into an array which in this case will be comprised of sorted subarrays. Because an array of a reference type will be sorted using Timsort, the algorithm implementation will spot all these sorted chunks. I.e. sorting the array composed of partially sorted subarrays isn't the same as sorting all these data from scratch. Hence, we can consider it as a possible option:

List<Stream<Integer>> streams =
List.of(Stream.of(1, 3), Stream.of(5), Stream.of(2, 6, 7),
        Stream.of(4, 9, 10), Stream.of(8));
        
streams.stream()
    .flatMap(Function.identity())
    .sorted()
    .forEach(num -> System.out.print(num   " "));

Will produce an output:

1 2 3 4 5 6 7 8 9 10 

If printing (or storing into a collection) the overall data sorted in ascending order doesn't seem satisfactory, and you insist on retrieving only a single value as a result of method invocation, I'll reiterate that it's impossible to fetch values one by one continuously from a stream.

For that you need an Iterator as the documentation suggests:

However, if the provided stream operations do not offer the desired functionality, the BaseStream.iterator() and BaseStream.spliterator() operations can be used to perform a controlled traversal.

You can implement a custom iterator that will utilize a PriorityQueue under the hood.

I assume that streams are of type that implements Comparable and streams are sorted (like in the example you've provided).

Note: if you expect source streams to be unsorted, the approach below will require a minor change: the list of iterators List<Iterators> can be replaced with a list of priority queues List<Queue>.

Iterator:

public class QueueBasedIterator<T extends Comparable<T>> implements Iterator<T> {
    private Queue<IteratorWrapper<T>> nextValues = new PriorityQueue<>();
    private List<Iterator> iterators = new ArrayList<>();
    
    @SafeVarargs
    public StreamBasedIterator(Stream<T>... streams) {
        this.iterators = Stream.of(streams).map(Stream::iterator)
            .collect(Collectors.toList());
        
        for (int i = 0; i < iterators.size(); i  ) {
            Iterator<T> iterator = iterators.get(i);
            if (iterator.hasNext()) 
                nextValues.add(new IteratorWrapper<T>(i, iterator.next()));
        }
    }
    
    @Override
    public boolean hasNext() {
        return !nextValues.isEmpty();
    }
    
    @Override
    public T next() {
        if (nextValues.isEmpty()) {
            throw new NoSuchElementException();
        }
        
        IteratorWrapper<T> next = nextValues.remove();
        Iterator<T> iterator = iterators.get(next.getPosition());
        if (iterator.hasNext())
            nextValues.add(new IteratorWrapper<T>(next.getPosition(), iterator.next()));
        
        return next.getValue();
    }
}

IteratorWrapper:

class IteratorWrapper<T extends Comparable<T>> implements Comparable<IteratorWrapper<T>> {
    private T value;
    private int position;
    
    public IteratorWrapper(int position, T value) {
        this.value = value;
        this.position = position;
    }
    
    public T getValue() {
        return value;
    }
    
    public int getPosition() {
        return position;
    }
    
    @Override
    public int compareTo(IteratorWrapper<T> o) {
        return this.value.compareTo(o.value);
    }
}

main() - demo

public static void main(String[] args) {
    QueueBasedIterator<Integer> iterator =
        new QueueBasedIterator<>(Stream.of(1, 3), Stream.of(5), Stream.of(2, 6, 7),
                                 Stream.of(4, 9, 10), Stream.of(8));
    
    while (iterator.hasNext()) {
        System.out.print(iterator.next()   " ");
    }
}

Output

1 2 3 4 5 6 7 8 9 10

CodePudding user response:

I am wondering is it possible to only peek the first element of the stream without retrieving it.

No. Peek is an intermediate operation, like map, sort, etc. They do not cause the stream to start delivering data. To do that requires a terminal operation such as reduce, forEach, or collector, etc to start the streaming process.

This also allows one to do things like the following with out storing any data. If it did, the first statement (series) would never finish as it would require infinite storage since there is no limiting method.

IntStream series = IntStream.iterate(0, i->i 1);
IntStream first10 = series.limit(10);
int[] toArray = first10.toArray();

System.out.println(Arrays.toString(toArray));

Prints

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In the above example, toArray() is the terminal operation that starts the stream. Once finished, the stream is exhausted and none of the aforementioned assignments (e.g. series, first10) may be used again.

  • Related