Home > other >  As quick as lightning disruptor claims to be a queue, but I actually testing, why is slower than the
As quick as lightning disruptor claims to be a queue, but I actually testing, why is slower than the

Time:09-18




Test environment is Windows 7 system, disruptor3.4.2, JDK 1.8,
Single producer, 10, 1000000 data

The test code:

1. The disruptor:

//DisruptorTest. Java

Package iot. CMCC. Test. The disruptor;

Import the Java. Util. Concurrent. The ExecutorService;
import java.util.concurrent.Executors;

The import com. Lmax. Disruptor. EventFactory;
The import com. Lmax. Disruptor. IgnoreExceptionHandler;
The import com. Lmax. Disruptor. RingBuffer;
The import com. Lmax. Disruptor. SequenceBarrier;
The import com. Lmax. Disruptor. WaitStrategy;
The import com. Lmax. Disruptor. WorkHandler;
The import com. Lmax. Disruptor. WorkerPool;
The import com. Lmax. Disruptor. YieldingWaitStrategy;
The import com. Lmax. Disruptor. DSL. ProducerType;

Public class DisruptorTest {
Private static Long time=System. CurrentTimeMillis ();

Public static void main (String [] args) {
RingBuffer RingBuffer;
Producer Producer=null;
//create a buffer pool
The ExecutorService executor=Executors. NewFixedThreadPool (Runtime. GetRuntime (). AvailableProcessors ());
//create factory
EventFactory The factory=new EventFactory () {
@ Override
Public TradeTransaction newInstance () {
Return new TradeTransaction ();
}
};
//create bufferSize, namely RingBuffer size, must be 2 N
Int ringBufferSize=1024 * 1024;//
WaitStrategy YIELDING_WAIT=new YieldingWaitStrategy ();
//create the ringBuffer
RingBuffer=ringBuffer. Create (ProducerType. MULTI, factory, ringBufferSize YIELDING_WAIT);
SequenceBarrier barriers.=ringBuffer newBarrier ();
//create 10 consumers to deal with the same producers of messages (this message) 10 consumers don't repeat consumption
//Consumer [] consumers=new Consumer [10].
WorkHandler [] workHandlers=new WorkHandler [10].
For (int I=0; I & lt; WorkHandlers. Length; I++) {
WorkHandlers [I]=new Consumer ();
}
WorkerPool WorkerPool=new WorkerPool (ringBuffer, barriers,
New IgnoreExceptionHandler (), workHandlers);

RingBuffer. AddGatingSequences (workerPool getWorkerSequences ());
WorkerPool. Start (executor);

Producer=new producer (ringBuffer);

For (int I=0; I & lt; 1000000; I++) {
Producer. OnData (time);
}

//executor. Shutdown ();
}
}
//TradeTransaction. Java

Package iot. CMCC. Test. The disruptor;

Public class TradeTransaction {

Public Long value;
Public String seq.

Public String getSeq () {
Return seq.
}

Public void setSeq (String seq) {
Enclosing seq=seq;
}

Public Long getValue () {
return value;
}

Public void setValue (value) {
This value=https://bbs.csdn.net/topics/value;
}
}
//Producer. Java

Package iot. CMCC. Test. The disruptor;

The import com. Lmax. Disruptor. RingBuffer;

Public class Producer {
Private RingBuffer RingBuffer;

Public Producer (RingBuffer RingBuffer) {
Enclosing ringBuffer=ringBuffer;
}

Public void onData (Long time) {
//ringBuffer can be regard as an event queue, then the event is next below a slot
Long sequence=ringBuffer. Next ();
RingBuffer. Get (sequence). SetValue (time);
RingBuffer. Get (sequence). SetSeq (sequence + "");
RingBuffer. The publish (sequence);
}

}
//Consumer. Java

Package iot. CMCC. Test. The disruptor;

The import com. Lmax. Disruptor. WorkHandler;

Public class Consumer implements WorkHandler {

@ Override
Public void onEvent (TradeTransaction event) throws the Exception {
//TODO Auto - generated method stub
System. The out. Println (" C1 consumers a message: "+ event. GetSeq ());
System. The out. Println (" time: "+ (System. CurrentTimeMillis () - event. GetValue ()));
}
}

//IntEventExceptionHandler. Java

Package iot. CMCC. Test. The disruptor;

Import org, apache log4j. Logger;

Public class IntEventExceptionHandler {
Private static final Logger Logger=Logger. GetLogger (IntEventExceptionHandler. Class);

Public void handleEventException (Throwable ex, long sequence, Object event) {
Logger. The error (" handleEventException ", the ex);
}

Public void handleOnStartException Throwable (ex) {
Logger. The error (" handleOnStartException ", the ex);
}

Public void handleOnShutdownException Throwable (ex) {
Logger. The error (" handleOnShutdownException ", the ex);
}
}
2. The JDK LinkedBlockingQueue queue

//jdkTest. Java

Package iot. CMCC. Test. Jdkseq;

Import the Java. Util. Concurrent. BlockingQueue;
Import the Java. Util. Concurrent. LinkedBlockingQueue;

Public class jdkTest {
Private static Long time=System. CurrentTimeMillis ();

Public static void main (String [] args) {
//Creating BlockingQueue of size 10
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related