Home > OS >  Collector toConcurrentMap() - What should the MergeFunction for a stream of Unique elements
Collector toConcurrentMap() - What should the MergeFunction for a stream of Unique elements

Time:10-10

Create map using parallel stream

I have this working code to create a map and populate using parallel stream

SortedMap<Double, Double> map = new TreeMap<>();

for (double i = 10d; i < 50d; i  = 2d)
    map.put(i, null);
    
map.entrySet().parallelStream().forEach(e -> {
    double res = bigfunction(e.getKey());
    e.setValue(100d - res); 
});

I'm working on reimplementing the above code in a proper way.

Incomplete code:

SortedMap<Double, Double> map = DoubleStream.iterate(10d, i -> i   2d).limit(20)
    .parallel()
    .collect(Collectors.toConcurrentMap(
        k -> k, k -> {
            double res = bigfunction(k);
            return 100d - res;
        },
        null,
        TreeMap::new
    ));
  • What should be the mergeFunction in this case
  • What changes are required to make this work

CodePudding user response:

In this case, you can throw from the mergeFunction of the collector, since you don't expect it to be used.

(left, right) -> {
    throw new AssertionError("Duplicate has been encountered: "   left);
}

If at a later point in time, someone would alter the stream source in such a way that stream data would no longer be unique, it would immediately become clear that this change goes against the initial logic. Meanwhile, a dummy merger like (a, b) -> b would hide the fact that data is being processed and then thrown away.

It's also worth to be aware:

  • that while performing calculations with double your result might appear to be non-accurate because this type inherently incapable to represent precisely fractions which are not power of 2. For that reason, some data might be lost. If your bigfunction() does a lot of calculations and loosing precision is not acceptable, you might consider using BigDecimal, but keep in mind that computations would become much heavier.

  • Another important thing to consider is that collector toConcurrentMap() expects a concurrent implementation of Map (credits to @DuncG for spotting this). Contrary to collectors optimized for single-threaded environment like toMap(), which would create a new instance of the container for each thread and then would merge the containers, concurrent collectors like toConcurrentMap() create only one container. I.e. each thread would operate with the same map, and it should be thread-safe. Therefore, TreeMap is not suitable for this scenario. Instead, we can use ConcurrentSkipListMap which also maintains its data ordered by key.

  • While dealing with a ConcurrentSkipListMap (or TreeMap), interface NavigableMap is more useful because it gives access to methods like higherEntry(), higherKey(), etc. which are not available with SortedMap.

That how the code might look like:

NavigableMap<Double, Double> map = DoubleStream.iterate(10d, i -> i   2d)
    .limit(20)
    .parallel()
    .boxed()
    .collect(Collectors.toConcurrentMap(
        Function.identity(),
        k -> 100d - bigfunction(k),
        (left, right) -> {
            throw new AssertionError("Duplicate has been encountered: "   left);
        },
        ConcurrentSkipListMap::new
    ));

CodePudding user response:

A few things. First, you need to box the primitive DoubleStream by calling boxed() in order to use it with a Collector. Second, you don't need toConcurrentMap(). It's safe to call toMap() on a parallel stream. Finally, the merge function will never be invoked in this scenario, because you're iterating over distinct keys, so I would go with what's simplest:

SortedMap<Double, Double> map = DoubleStream.iterate(10d, i -> i   2d)
        .limit(20)
        .boxed()
        .parallel()
        .collect(Collectors.toMap(
                i -> i, i -> 100d - bigfunction(i), (a, b) -> b, TreeMap::new));
  • Related