In a single producer and single consumer scenario, is this queue
thread-safe?
takeIndex
andputIndex
need to beAtomicLong
?Object[] ar
need to beAtomicReferenceArray
so we can useAtomicReferenceArray#set
to updatearray
.
My understanding is that Object[] arr
needs to be an AtomicReferenceArray
to ensure visibility, so that when the producer
puts an element, the consumer can see it immediately. Otherwise, when there is already an element in the array, maybe the consumer
cannot consume it immediately (it may take the next poll
to get the data), this will cause data consume delay? But it will not bring abnormal behavior, such as consume the same element twice, or throw some unexpected exceptions (These are unacceptable).
public class SpscArrayQueue<E> implements Queue<E> {
private final int cap;
private Object[] arr; // change to AtomicReferenceArray?
long takeIndex; // change to AtomicLong?
long putIndex; // change to AtomicLong?
final int mask;
static int ceilingNextPowerOfTwo(int x) {
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1));
}
public SpscArrayQueue(int cap) {
this.cap = ceilingNextPowerOfTwo(cap);
arr = new Object[this.cap];
mask = this.cap - 1;
}
@Override
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException("Null element");
}
final long index = this.putIndex;
final int mask = this.mask;
final int offset = (int) (mask & index);
if (arr[offset] != null) {
return false;
}
arr[offset] = e; // use unsafe.xxx ? or AtomicReferenceArray#set?
putIndex = (index 1); // use AtomicLong#increment?
return true;
}
@Override
@SuppressWarnings("unchecked")
public E poll() {
final long index = this.takeIndex;
final int mask = this.mask;
final int offset = (int) (mask & index);
final E e = (E) arr[offset];
if (e == null) {
return null;
}
takeIndex = (index 1); // use AtomicLong#increment?
arr[offset] = null; // use unsafe.xxx ? or AtomicReferenceArray#set?
return e;
}
// other methods
}
CodePudding user response:
The current implementation is not thread-safe because there is no happens-before edge between producing and consuming an item. And there is no happens before edge between releasing a slot and acquiring a slot.
It has been quite some time I played with ringbuffers. But it should be sufficient to make take and put index 'synchronized' (e.g. an AtomicLong or a volatile long) and plain access to the array. That should create the happens-before edge between writing and reading an item. And typically the head and tail sequence of the ringbuffer are used to determine if there are items, no checking of null/not null within the array.
If you want to use something like this in production, I would suggest having a look at JCTools.
JCTools is properly tested and typical performance bottlenecks like false sharing have been resolved. Apart from working with a full-blown volatile, which is relatively expensive, JCTools also offers more relaxed methods using release/acquire semantics which in most cases is good enough.
If it is for educational purposes, I would check out the JCTools code and also the following java-ring-buffer.