Test environment is Windows 7 system, disruptor3.4.2, JDK 1.8,
Single producer, 10, 1000000 data
The test code:
1. The disruptor:
//DisruptorTest. Java
Package iot. CMCC. Test. The disruptor;
Import the Java. Util. Concurrent. The ExecutorService;
import java.util.concurrent.Executors;
The import com. Lmax. Disruptor. EventFactory;
The import com. Lmax. Disruptor. IgnoreExceptionHandler;
The import com. Lmax. Disruptor. RingBuffer;
The import com. Lmax. Disruptor. SequenceBarrier;
The import com. Lmax. Disruptor. WaitStrategy;
The import com. Lmax. Disruptor. WorkHandler;
The import com. Lmax. Disruptor. WorkerPool;
The import com. Lmax. Disruptor. YieldingWaitStrategy;
The import com. Lmax. Disruptor. DSL. ProducerType;
Public class DisruptorTest {
Private static Long time=System. CurrentTimeMillis ();
Public static void main (String [] args) {
RingBuffer
Producer Producer=null;
//create a buffer pool
The ExecutorService executor=Executors. NewFixedThreadPool (Runtime. GetRuntime (). AvailableProcessors ());
//create factory
EventFactory
@ Override
Public TradeTransaction newInstance () {
Return new TradeTransaction ();
}
};
//create bufferSize, namely RingBuffer size, must be 2 N
Int ringBufferSize=1024 * 1024;//
WaitStrategy YIELDING_WAIT=new YieldingWaitStrategy ();
//create the ringBuffer
RingBuffer=ringBuffer. Create (ProducerType. MULTI, factory, ringBufferSize YIELDING_WAIT);
SequenceBarrier barriers.=ringBuffer newBarrier ();
//create 10 consumers to deal with the same producers of messages (this message) 10 consumers don't repeat consumption
//Consumer [] consumers=new Consumer [10].
WorkHandler
For (int I=0; I & lt; WorkHandlers. Length; I++) {
WorkHandlers [I]=new Consumer ();
}
WorkerPool
New IgnoreExceptionHandler (), workHandlers);
RingBuffer. AddGatingSequences (workerPool getWorkerSequences ());
WorkerPool. Start (executor);
Producer=new producer (ringBuffer);
For (int I=0; I & lt; 1000000; I++) {
Producer. OnData (time);
}
//executor. Shutdown ();
}
}
//TradeTransaction. Java
Package iot. CMCC. Test. The disruptor;
Public class TradeTransaction {
Public Long value;
Public String seq.
Public String getSeq () {
Return seq.
}
Public void setSeq (String seq) {
Enclosing seq=seq;
}
Public Long getValue () {
return value;
}
Public void setValue (value) {
This value=https://bbs.csdn.net/topics/value;
}
}
//Producer. Java
Package iot. CMCC. Test. The disruptor;
The import com. Lmax. Disruptor. RingBuffer;
Public class Producer {
Private RingBuffer
Public Producer (RingBuffer
Enclosing ringBuffer=ringBuffer;
}
Public void onData (Long time) {
//ringBuffer can be regard as an event queue, then the event is next below a slot
Long sequence=ringBuffer. Next ();
RingBuffer. Get (sequence). SetValue (time);
RingBuffer. Get (sequence). SetSeq (sequence + "");
RingBuffer. The publish (sequence);
}
}
//Consumer. Java
Package iot. CMCC. Test. The disruptor;
The import com. Lmax. Disruptor. WorkHandler;
Public class Consumer implements WorkHandler
@ Override
Public void onEvent (TradeTransaction event) throws the Exception {
//TODO Auto - generated method stub
System. The out. Println (" C1 consumers a message: "+ event. GetSeq ());
System. The out. Println (" time: "+ (System. CurrentTimeMillis () - event. GetValue ()));
}
}
//IntEventExceptionHandler. Java
Package iot. CMCC. Test. The disruptor;
Import org, apache log4j. Logger;
Public class IntEventExceptionHandler {
Private static final Logger Logger=Logger. GetLogger (IntEventExceptionHandler. Class);
Public void handleEventException (Throwable ex, long sequence, Object event) {
Logger. The error (" handleEventException ", the ex);
}
Public void handleOnStartException Throwable (ex) {
Logger. The error (" handleOnStartException ", the ex);
}
Public void handleOnShutdownException Throwable (ex) {
Logger. The error (" handleOnShutdownException ", the ex);
}
}
2. The JDK LinkedBlockingQueue queue
//jdkTest. Java
Package iot. CMCC. Test. Jdkseq;
Import the Java. Util. Concurrent. BlockingQueue;
Import the Java. Util. Concurrent. LinkedBlockingQueue;
Public class jdkTest {
Private static Long time=System. CurrentTimeMillis ();
Public static void main (String [] args) {
//Creating BlockingQueue of size 10
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull