Home > Back-end >  Ask why Rocketmq a transaction message function, according to the example of official unknown messag
Ask why Rocketmq a transaction message function, according to the example of official unknown messag

Time:12-18

Rocketmq a transaction message function, according to the official example of unknown message failed to trigger a listener checkLocalTransaction method
Use version from 4.4 to 4.5 were tried, failed,

With the official example transaction inside the bag example:
 



Public class TransactionProducer {
Public static void main (String [] args) throws MQClientException, InterruptedException {
TransactionListener TransactionListener=new TransactionListenerImpl ();
TransactionMQProducer producer=new TransactionMQProducer (" MyTopic ");
The ExecutorService ExecutorService=new ThreadPoolExecutor (2, 5, 100, TimeUnit. SECONDS, new ArrayBlockingQueue (2000), a new ThreadFactory () {
@ Override
Public Thread newThread (Runnable r) {
Thread the Thread=new Thread (r);
Thread. Elegantly-named setName (" client ws-transaction - MSG - check - thread ");
Return the thread;
}
});
Producer. SetNamesrvAddr (127.0.0.1: "9876");
Producer. SetExecutorService (executorService);
Producer. SetTransactionListener (transactionListener);
Producer. The start ();

//the String tags=new String [] [] {" TagA ", "TagB", "TagC", "TagD", "TagE"};

String [] tags=new String [] {} "TagA";
For (int I=0; i <1; I++) {
Try {
The Message MSG=
New Message (" TopicTest1234 ", tags [I % tags. The length], "the KEY" + I,
(" Hello RocketMQ "+ I). GetBytes (RemotingHelper. DEFAULT_CHARSET));
SendResult SendResult=producer. SendMessageInTransaction (MSG, null);
System. The out. Printf (" % s % n, "sendResult);

Thread.sleep (10);
} the catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}

For (int I=0; i <100000; I++) {
Thread.sleep(1000);
}
Producer. The shutdown ();
}
}



Public class TransactionListenerImpl implements TransactionListener {
Private AtomicInteger transactionIndex=new AtomicInteger (0);

Private ConcurrentHashMap LocalTrans=new ConcurrentHashMap<> (a);

@ Override
Public LocalTransactionState executeLocalTransaction (Message MSG, Object arg) {
Int value=(https://bbs.csdn.net/topics/transactionIndex.getAndIncrement);
Int the status=value % 3;
LocalTrans. Put (MSG) getTransactionId (), status);
Return LocalTransactionState. UNKNOW;
}

@ Override
Public LocalTransactionState checkLocalTransaction MessageExt (MSG) {
System. The out. Println ("==========================check MSG back "+ MSG. GetMsgId () +"=====================");
An Integer status=localTrans. Get (MSG) getTransactionId ());
if (null !={
the status)The switch (status) {
Case 0:
Return LocalTransactionState. UNKNOW;
Case 1:
Return to LocalTransactionState.COM MIT_MESSAGE;
Case 2:
Return LocalTransactionState. ROLLBACK_MESSAGE;
Default:
Return to LocalTransactionState.COM MIT_MESSAGE;
}
}
Return to LocalTransactionState.COM MIT_MESSAGE;
}
}







Start the TransactionProducer, he will send a message, deliberately in the listener executeLocalTransaction () method in
Return LocalTransactionState. UNKNOW;
But wait checkLocalTransaction state searching method () calls, but has not been into this method, then the heavy check mechanism can't seen,
There are superior to misgivings
  • Related