Home > Software design >  Custom Collector that cannot work in parallel
Custom Collector that cannot work in parallel

Time:02-17

I have made a custom collector that uses a MessageDigest to create a hash. In general MessageDigest does not work in parallel. The issue I'm seeing is in the combiner() method. It is not possible to combine two MessageDigest objects. When I return null it seems to work but if I throw an UnsupportedOperationException it fails. What is the typical way to implement a collector that doesn't support parallel operations?

class ChecksumCollector implements Collector<String, MessageDigest, ByteBuffer> {
    private String algorithm;

    ChecksumCollector(final String algorithm) {
        this.algorithm = algorithm;
    }

    @Override
    public Supplier<MessageDigest> supplier() {
        return () -> {
            try {
                return MessageDigest.getInstance(algorithm);
            } catch (NoSuchAlgorithmException e) {
                throw new UnsupportedOperationException("Could not find MessageDigest for algorithm "   algorithm, e);
            }
        };
    }

    @Override
    public BiConsumer<MessageDigest, String> accumulator() {
        return (md, s) -> md.update(s.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public BinaryOperator<MessageDigest> combiner() {
        return null; //seems to work but hash may not be correct?
        //throw new UnsupportedOperationException(LineDuplicationHash.class.getSimpleName()   " does not support parallel streams");
    }

    @Override
    public Function<MessageDigest, ByteBuffer> finisher() {
        return md -> ByteBuffer.wrap(md.digest());
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Set.of();
    }
}

CodePudding user response:

A Collector’s BinaryOperator returned by combiner() will only be used when used for parallel streams, however the combiner() method itself will be invoked when calling Stream.collect() to retrieve that combiner, in the JDK’s implementation (see ReduceOps.makeRef(Collector)).

You thus have 2 options:

  • either return null, which would cause a NullPointerException if your collector is used in a parallel Stream, at the time the combiner needs to be used;
  • or return a BinaryOperator that actually throws the exception when called:
return (a, b) -> throw new UnsupportedOperationException(
    LineDuplicationHash.class.getSimpleName()   " does not support parallel streams");

This second option would be more friendly to the unknowing developer who later changes your pipeline to make it parallel.

  • Related