Home > OS >  Java Stream GroupBy and Reduce
Java Stream GroupBy and Reduce

Time:09-18

I have an Item class which contains a code, quantity and amount fields, and a list of items which may contain many items (with same code). I want to group the items by code and sum up their quantities and amounts.

I was able to achieve half of it using stream's groupingBy and reduce. The grouping by worked, but the reduce is reducing all of the grouped items into one single item repeated over the different codes (groupingBy key).

Shouldn't reduce here reduce the list of items for each code from the map? Why is it retuning the same combined item for all.

Below is a sample code.

import java.util.List;
import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.Map;

class HelloWorld {
    public static void main(String[] args) {
        List<Item> itemList = Arrays.asList(
            createItem("CODE1", 1, 12),
            createItem("CODE2", 4, 22),
            createItem("CODE3", 5, 50),
            createItem("CODE4", 2, 11),
            createItem("CODE4", 8, 20),
            createItem("CODE2", 1, 42)
        );
        
        Map<String, Item> aggregatedItems = itemList
            .stream()
            .collect(Collectors.groupingBy(
                Item::getCode,
                Collectors.reducing(new Item(), (aggregatedItem, item) -> {
                    int aggregatedQuantity = aggregatedItem.getQuantity();
                    double aggregatedAmount = aggregatedItem.getAmount();
                    
                    aggregatedItem.setQuantity(aggregatedQuantity   item.getQuantity());
                    aggregatedItem.setAmount(aggregatedAmount   item.getAmount());
                    
                    return aggregatedItem;
                })
            ));
        
        System.out.println("Map total size: "   aggregatedItems.size()); // expected 4
        System.out.println();
        aggregatedItems.forEach((key, value) -> {
            System.out.println("key: "   key);
            System.out.println("value - quantity: "   value.getQuantity()   " - amount: "   value.getAmount());
            System.out.println();
        });
    }
    
    private static Item createItem(String code, int quantity, double amount) {
        Item item = new Item();
        item.setCode(code);
        item.setQuantity(quantity);
        item.setAmount(amount);
        return item;
    }
}

class Item {
    private String code;
    private int quantity;
    private double amount;
    
    public Item() {
        quantity = 0;
        amount = 0.0;
    }
    
    public String getCode() { return code; }
    public int getQuantity() { return quantity; }
    public double getAmount() { return amount; }
    
    public void setCode(String code) { this.code = code; }
    public void setQuantity(int quantity) { this.quantity = quantity; }
    public void setAmount(double amount) { this.amount = amount; }
}

and below is the output.

Map total size: 4

key: CODE2
value - quantity: 21 - amount: 157.0

key: CODE1
value - quantity: 21 - amount: 157.0

key: CODE4
value - quantity: 21 - amount: 157.0

key: CODE3
value - quantity: 21 - amount: 157.0

CodePudding user response:

You must not modify the input arguments to Collectors.reducing. new Item() is only executed once and all your reduction operations will share the same "aggregation instance". In other words: the map will contain the same value instance 4 times (you can easily check yourself with System.identityHashCode() or by comparing for reference-equality: aggregatedItems.get("CODE1") == aggregatedItems.get("CODE2")).

Instead, return a new result instance:

        final Map<String, Item> aggregatedItems = itemList
            .stream()
            .collect(Collectors.groupingBy(
                Item::getCode,
                Collectors.reducing(new Item(), (item1, item2) -> {
                    final Item reduced = new Item();
                    reduced.setQuantity(item1.getQuantity()   item2.getQuantity());
                    reduced.setAmount(item1.getAmount()   item2.getAmount());
                    return reduced;
                })
            ));

Output:

Map total size: 4

key: CODE2
value - quantity: 5 - amount: 64.0

key: CODE1
value - quantity: 1 - amount: 12.0

key: CODE4
value - quantity: 10 - amount: 31.0

key: CODE3
value - quantity: 5 - amount: 50.0

CodePudding user response:

You are using reducing, which assumes that you won't mutate the accumulator passed in. reducing won't create new Items for you for each new group, and expects you to create new Items and return them in the lambda, like this:

// this works as expected
.collect(Collectors.groupingBy(
    Item::getCode,
    Collectors.reducing(new Item(), (item1, item2) -> createItem(
        item1.getCode(),
        item1.getQuantity()   item2.getQuantity(),
        item1.getAmount()   item2.getAmount()
    ))
));

so it is very suitable if you are using immutable objects like numbers or strings.

Since you are not creating new Items in your code, reducing keeps on reusing that same instance, resulting in the behaviour you see.

If you want to mutate the objects, you can do mutable reduction in a thread safe way with Collector.of:

.collect(Collectors.groupingBy(
    Item::getCode,
    Collector.of(Item::new, (aggregatedItem, item) -> {
        int aggregatedQuantity = aggregatedItem.getQuantity();
        double aggregatedAmount = aggregatedItem.getAmount();

        aggregatedItem.setQuantity(aggregatedQuantity   item.getQuantity());
        aggregatedItem.setAmount(aggregatedAmount   item.getAmount());
    }, (item1, item2) -> createItem(
        item1.getCode(),
        item1.getQuantity()   item2.getQuantity(),
        item1.getAmount()   item2.getAmount()
    ))
));

Notice that you now pass the reference to Item's constructor, i.e. a way to create new Items when necessary, as opposed to just a single new Item(). In addition, you also provide a third argument, the combiner, that tells the collector how to create a new item from two existing ones, which will be used if this collector is used in a concurrent situation. (See here for more info about the combiner)

This contrast between Collector.of and Collectors.reducing is the same contrast between Stream.reduce and Stream.collect. Learn more here.

CodePudding user response:

Mutable reduction vs Immutable reduction

In this case, Collectors.reducing() isn't the right tool because it meant for immutable reduction, i.e. for performing folding operation in which every reduction step results in creation of a new immutable object.

But instead of generating a new object at each reduction step, you're changing the state of the object provided as an identity.

As a consequence, you're getting an incorrect result because the identity object would be created only once per thread. This single instance of the Item is used for accumulation, and reference to it end up in every value of the map.

More elaborate information you can find in the Stream API documentation, specifically in these parts: Reduction and Mutable Reduction.

And here's a short quote explaining how Stream.reduce() works (the mechanism behind Collectors.reducing() is the same):

The accumulator function takes a partial result and the next element, and produces a new partial result.

Use mutable reduction

The problem can be fixed by generating a new instance of Item while accumulating values mapped to the same key with, but a more performant approach would be to use mutable reduction instead.

For that, you can implement a custom collector created via static method Collector.of():

Map<String, Item> aggregatedItems = itemList.stream()
    .collect(Collectors.groupingBy(
        Item::getCode,
        Collector.of(
            Item::new,   // mutable container of the collector
            Item::merge, // accumulator - defines how stream data should be accumulated
            Item::merge  // combiner - mergin the two containers while executing stream in parallel
        )
    ));

For convenience, you can introduce method merge() responsible for accumulating properties of the two items. It would allow to avoid repeating the same logic in accumulator and combiner, and keep the collector implementation lean and well-readable.

public class Item {
    private String code;
    private int quantity;
    private double amount;
    
    // getters, constructor, etc.
    
    public Item merge(Item other) {
        this.quantity  = other.quantity;
        this.amount  = other.amount;
        return this;
    }
}
  • Related