Home > database >  Is array in java is threadsafe in case : one thread change value, one thread read value?
Is array in java is threadsafe in case : one thread change value, one thread read value?

Time:09-26

I write a simple ringbuffer and in method test1() I use one thread is poll() and one thread is offer(). I test many time but it is always true. Can you explain for me?


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@SuppressWarnings("unchecked")
public class RingBuffer<T> {
    private T[] buffer;
//    private volatile T[] buffer;
    private int readIndex;
    private int writeIndex;
    private final int capacity;
    private AtomicInteger size;

    public RingBuffer(int k) {
        this.buffer = (T[]) new Object[k];
        this.capacity = k;
        this.readIndex = 0;
        this.writeIndex = 0;
        this.size = new AtomicInteger(0);
    }

    public boolean offer(T value) {
        if (isFull()) return false;
        buffer[writeIndex] = value;
        writeIndex  ;
        if (writeIndex == capacity) writeIndex -= capacity;
        size.getAndIncrement();
        return true;
    }

    public T poll() {
        if (isEmpty()) return null;
        int index = readIndex;
        T x = buffer[index];
        readIndex  ;
        if (readIndex == capacity) readIndex -= capacity;
        size.getAndDecrement();
        return x;
    }

    public boolean isEmpty() {
        return size.get() == 0;
    }

    public boolean isFull() {
        return size.get() == capacity;
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test1();
    }

}

As you see in the test1() method I use different Thread but the check is true. Sorry because stackoverflow warning me this question is mostly code so I will paste test1() method in here.

  public static void test1() throws ExecutionException, InterruptedException {
        RingBuffer<String> buffer = new RingBuffer<>(1000);
        AtomicBoolean writeDone1 = new AtomicBoolean(false);
        ExecutorService service = Executors.newFixedThreadPool(2);
        ExecutorService service1 = Executors.newFixedThreadPool(2);

        Callable<List<String>> cw1 = () -> {
            List<String > x = new ArrayList<>();
            int count = 0;
            for (int i = 0; i < 10000000; i  ) {
                if (buffer.offer( i "")) {
                    count  ;
                    x.add(i "");
                }
            }
            writeDone1.set(true);
            System.out.println("num write "   count);
            return x;
        };

        Callable<List<String>> cr = () -> {
            List<String> x = new ArrayList<>();
            int count = 0;
            while (!writeDone1.get()) {
                String  data = buffer.poll();
                if (data != null) {
                    x.add(data);
                    count  ;
                }
            }

            while (true) {
                String data = buffer.poll();
                if (data != null) {
                    x.add(data);
                    count  ;
                } else {
                    break;
                }
            }
            System.out.println("num read "   count);
            return x;
        };
        Future<List<String >> fw = service.submit(cw1);
        Future<List<String>> fr = service1.submit(cr);

        List<String> sw = fw.get();
        List<String> sr = fr.get();
        System.out.println(sw.size());
        System.out.println(sr.size());
        boolean check = true;
        for (int i =0 ; i< sw.size() ; i  ){
            if (!sw.get(i).equals( sr.get(i))){
                check = false;
                break;
            }
        }
        System.out.println(check);

        service.shutdown();
        service1.shutdown();
    }

If I use only one consumer and producer. I can't write a test make the race conditions here. Can you help me?

Thankyou

CodePudding user response:

There is no happens before edge between a write to an array at some position and a read from the same position. So if you don't have any ordering guarantee in place, your code is suffering from a data race.

If you also allow for concurrent offers and concurrent polls, then you also have race conditions on your hands.

It has been quite some time I played with ringbuffers. But normally you make use of a tail and head sequence (e.g. a long). If you make the ringbuffer a power of 2, you can do a cheap mod on the conversion of the sequences to indices. And the head and tail sequence could be relatively expensive volatiles (I really would start with that) and later on you could play with relaxed memory order modes. The head and tail will give you the appropriate happens before edges so don't need to do anything special to the array. With this approach you can also get rid of the 'size'; you can calculate the size as the difference between tail and thehead; the problem with size is that it will cause contention between a thread read/writing to the ringbuffer. Also you need to properly pad the the head/tail fields to prevent false sharing.

CodePudding user response:

If there are one consumer and one producer, then this RingBuffer is thread-safe.

Happens-before is provided by AtomicInteger size: it is read at the start and is written at the end of both poll() and offer().

For example, let's look at poll().
Notice that:

  1. in poll() we read buffer[index] only if we've read size.get()!=0
  2. size.get()!=0 can only happen after size.getAndIncrement() in offer()
  3. size is AtomicInteger, which means it provides happens-before and makes all modifications in offer() visible in poll()

In other words:

  • buffer[writeIndex]=value in offer()
  • -(happens-before)-> size.getAndIncrement() in offer()
  • -(happens-before)-> size.get()!=0 in poll()
  • -(happens-before)-> T x = buffer[index] in poll()
  • Related