Today I was using a stream that was performing a parallel()
operation after a map, however; the underlying source is an iterator which is not thread safe which is similar to the BufferedReader.lines implementation.
I originally thought that trySplit would be called on the created thread, however; I observed that the accesses to the iterator have come from multiple threads.
By example, the following silly iterator implementation is just setup with enough elements to cause splitting and also keeps track of the unique threads that accessed the hasNext
method.
class SillyIterator implements Iterator<String> {
private final ArrayDeque<String> src =
IntStream.range(1, 10000)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayDeque::new));
private Map<String, String> ts = new ConcurrentHashMap<>();
public Set<String> threads() { return ts.keySet(); }
private String nextRecord = null;
@Override
public boolean hasNext() {
var n = Thread.currentThread().getName();
ts.put(n, n);
if (nextRecord != null) {
return true;
} else {
nextRecord = src.poll();
return nextRecord != null;
}
}
@Override
public String next() {
if (nextRecord != null || hasNext()) {
var rec = nextRecord;
nextRecord = null;
return rec;
}
throw new NoSuchElementException();
}
}
Using this to create a stream as follows:
var iter = new SillyIterator();
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL
), false)
.map(n -> "value = " n)
.parallel()
.collect(toList());
System.out.println(iter.threads());
This on my system output the two fork join threads as well as the main thread, which kind of scared me.
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
CodePudding user response:
Thread safety does not necessarily imply being accessed by only one thread. The important aspect is that there is no concurrent access, i.e. no access by more than one thread at the same time. If the access by different threads is temporally ordered and this ordering also ensures the necessary memory visibility, which is the responsibility of the caller, it still is a thread safe usage.
The Spliterator
documentation says:
Despite their obvious utility in parallel algorithms, spliterators are not expected to be thread-safe; instead, implementations of parallel algorithms using spliterators should ensure that the spliterator is only used by one thread at a time. This is generally easy to attain via serial thread-confinement, which often is a natural consequence of typical parallel algorithms that work by recursive decomposition.
The spliterator doesn’t need to be confined to the same thread throughout its lifetime, but there should be a clear handover at the caller’s side ensuring that the old thread stops using it before the new thread starts using it.
But the important takeaway is, the spliterator doesn’t need to be thread safe, hence, the iterator wrapped by a spliterator also doesn’t need to be thread safe.
Note that a typical behavior is splitting and handing over before starting traversal, but since an ordinary Iterator
doesn’t support splitting, the wrapping spliterator has to iterate and buffer elements to implement splitting. Therefore, the Iterator
experiences traversal by different threads (but one at a time) when the traversal has not been started from the Stream
implementation’s perspective.
That said, the lines()
implementation of BufferedReader
is a bad example which you should not follow. Since it’s centered around a single readLine()
call, it would be natural to implement Spliterator
directly instead of implementing a more complicated Iterator
and have it wrapped via spliteratorUnknownSize(…)
.
Since your example is likewise centered around a single poll()
call, it’s also straight-forward to implement Spliterator
directly:
class SillySpliterator extends Spliterators.AbstractSpliterator<String> {
private final ArrayDeque<String> src = IntStream.range(1, 10000)
.mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));
SillySpliterator() {
super(Long.MAX_VALUE, ORDERED | NONNULL);
}
@Override
public boolean tryAdvance(Consumer<? super String> action) {
String nextRecord = src.poll();
if(nextRecord == null) return false;
action.accept(nextRecord);
return true;
}
}
Depending on your real life case, you may also pass the actual deque size to the constructor and provide the SIZED
characteristic.
Then, you may use it like
var result = StreamSupport.stream(new SillySpliterator(), true)
.map(n -> "value = " n)
.collect(toList());