Home > other >  Java Stream remove elements that fulfill a condition but leave the first X in the stream
Java Stream remove elements that fulfill a condition but leave the first X in the stream

Time:05-04

Just wanted to know if there is any better way of doing this.

I have a list of orderable elements (in this example by age, youngest first) but I want delete all elements that fulfill a condition (in this example red elements) but keep the first 2.

Stream<ElementsVO> stream = allElements.stream();
Stream<ElementsVO> redStream = stream.filter(elem->elem.getColor()==RED).sorted((c1, c2) -> { return c1.getAge() - c2.getAge();
            }).limit(2);
Stream<ElementsVO> nonRedStream=stream.filter(elem->elem.getColor()!=RED);

List<ElementsVO> resultList = Stream.concat(redStream,nonRedStream).sorted((c1, c2) -> { return c1.getAge() - c2.getAge();
            }).collect(Collectors.toList());
    

Any idea to improve this? Any way to implement an accumulator function or something like that with stream?

CodePudding user response:

You can technically do this with a stateful predicate:

Predicate<ElementsV0> statefulPredicate = new Predicate<ElementsV0>() {
  private int reds = 0;

  @Override public boolean test(ElementsV0 e) {
    if (elem.getColor() == RED) {
      reds  ;
      return reds < 2;
    }
    return true;
  }
};

Then:

List<ElementsVO> resultList =
    allElements.stream()
        .sorted(comparingInt(ElementsV0::getAge))
        .filter(statefulPredicate)
        .collect(toList());

This might work, but it is a violation of the Stream API: the documentation for Stream.filter says that the predicate should be stateless, which in general allows the stream implementation to apply the filter in any order. For small input lists, streamed sequentially, this will almost certainly be the appearance order in the list, but it's not guaranteed.

Caveat emptor. Your current way works, although you could do the partitioning of the list more efficiently using Collectors.partitioningBy to avoid iterating it twice.

CodePudding user response:

I wrote a generic Collector with a predicate and a limit of elements to add which match the predicate:

public class LimitedMatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private Predicate<T> filter;

    private int limit;

    public LimitedMatchCollector(Predicate<T> filter, int limit)
    {
        super();
        this.filter = filter;
        this.limit = limit;
    }

    private int count = 0;

    @Override
    public Supplier<List<T>> supplier() {
        return () -> new ArrayList<T>();
    }

    @Override
    public BiConsumer<List<T>, T> accumulator() {
        return this::accumulator;
    }

    @Override
    public BinaryOperator<List<T>> combiner() {
        return this::combiner;
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Stream.of(Characteristics.IDENTITY_FINISH)
                .collect(Collectors.toCollection(HashSet::new));
    }

    public List<T> accumulator(List<T> list , T e) {
        if (filter.test(e)) {

           if (count >= limit) {
               return list;
           }
           count  ;
        }
        list.add(e);
        return list;
    }

    public List<T> combiner(List<T> left , List<T> right) {
        right.forEach( e -> {
            if (filter.test(e)) {

                if (count < limit) {
                    left.add(e);
                    count  ;
                }
             }
        });

        return left;
    }

    @Override
    public Function<List<T>, List<T>> finisher()
    {
        return Function.identity();
    }

}

Usage:

    List<ElementsVO> list = Arrays.asList(new ElementsVO("BLUE", 1)
           ,new ElementsVO("BLUE", 2) // made color a String
           ,new ElementsVO("RED", 3)
           ,new ElementsVO("RED", 4)
           ,new ElementsVO("GREEN", 5)
           ,new ElementsVO("RED", 6)
           ,new ElementsVO("YELLOW", 7)
           );
    System.out.println(list.stream().collect(new LimitedMatchCollector<ElementsVO>( (e) -> "RED".equals(e.getColor()),2)));

CodePudding user response:

You can implement a custom collector that will maintain two separate collections of RED and non-RED element.

And since you need only two red elements having the greatest age to improve performance, you can introduce a partial sorting. I.e. collection of non-red element needs to maintain an order and always must be of size 2 at most, with that overhead of sorting will be far less significant in comparison to sorting of elements having the property of RED in order to pick only two of them.

In order to create a custom collector, you might make use of the static method Collector.of() which expects the following arguments:

  • Supplier Supplier<A> is meant to provide a mutable container which store elements of the stream. Because we need to separate elements by color into two groups as a container, we can use a map that will contain only 2 keys (true and false), denoting whether elements mapped to this key are red. In order to store red-elements and perform a partial sorting, we need a collection that is capable of maintaining the order. PriorityQueue is a good choice for that purpose. To store all other elements, I've used ArrayDeque, which doesn't maintain the order and as fast as ArrayList.
  • Accumulator BiConsumer<A,T> defines how to add elements into the mutable container provided by the supplier. For this task, the accumulator needs to guarantee that the queue, containing red-elements will not exceed the given size by rejecting values that are smaller than the lowest value previously added to the queue and by removing the lowest value if the size has reached the limit and a new value needs to be added. This functionality extracted into a separate method tryAdd()
  • Combiner BinaryOperator<A> combiner() establishes a rule on how to merge two containers obtained while executing stream in parallel. Here, combiner rely on the same logic that was described for accumulator.
  • Finisher Function<A,R> is meant to produce the final result by transforming the mutable container. In the code below, finisher dumps the contents of both queues into a stream, sorts them and collects into an immutable list.
  • Characteristics allow fine-tuning the collector by providing additional information on how it should function. Here a characteristic Collector.Characteristics.UNORDERED is being applied. Which indicates that the order in which partial results of the reduction produced in parallel is not significant, that can improve performance of this collector with parallel streams.

The code might look like this:

public static void main(String[] args) {
    List<ElementsVO> allElements =
        List.of(new ElementsVO(Color.RED, 25), new ElementsVO(Color.RED, 23), new ElementsVO(Color.RED, 27),
                new ElementsVO(Color.BLACK, 19), new ElementsVO(Color.GREEN, 23), new ElementsVO(Color.GREEN, 29));
    
    Comparator<ElementsVO> byAge = Comparator.comparing(ElementsVO::getAge);
    
    List<ElementsVO> resultList = allElements.stream()
        .collect(Collector.of(
            () -> Map.of(true, new PriorityQueue<>(byAge),
                      false, new ArrayDeque<>()),
            (Map<Boolean, Queue<ElementsVO>> isRed, ElementsVO next) -> {
                if (next.getColor() != Color.RED) isRed.get(false).add(next);
                else tryAdd(isRed.get(true), next, byAge, 2);
            },
            (Map<Boolean, Queue<ElementsVO>> left, Map<Boolean, Queue<ElementsVO>> right) -> {
                left.get(false).addAll(right.get(false));
                left.get(true).forEach(next -> tryAdd(left.get(true), next, byAge, 2));
                return left;
            },
            (Map<Boolean, Queue<ElementsVO>> isRed) -> isRed.values().stream()
                .flatMap(Queue::stream).sorted(byAge).toList(),
            Collector.Characteristics.UNORDERED
        ));

    resultList.forEach(System.out::println);
}

This method is responsible for adding the next red-element into the priority queue. It expects a comparator in order to be able to determine whether the next element should be added or discarded, and a value of the maximum size of the queue (2), to check if it was exceeded.

public static <T> void tryAdd(Queue<T> queue, T next, Comparator<T> comparator, int size) {
    if (queue.size() == size && comparator.compare(queue.element(), next) < 0)
        queue.remove(); // if the next element is greater than the smallest element in the queue and max size has been exceeded, the smallest element needs to be removed from the queue
    if (queue.size() < size) queue.add(next);
}

Output

lementsVO{color=BLACK, age=19}
ElementsVO{color=GREEN, age=23}
ElementsVO{color=RED, age=25}
ElementsVO{color=RED, age=27}
ElementsVO{color=GREEN, age=29}
  • Related