Home > Back-end >  RocketMQ native API how to register the order at the same time consumption listener and concurrent l
RocketMQ native API how to register the order at the same time consumption listener and concurrent l

Time:04-08

Register a kind of monitor consumer code as follows, defines a @ Configuration completed listener registration
But I have a demand scenarios is the topic need to order and disorder of consumption at the same time, I try to create two Configuration initialization but only one of them, if I only create a Configuration code, the code function DefaultMQPushConsumer kind of new two DefaultMQPushConsumer a registered concurrent listener and start another listener and orderly start
Start is not an error, but the actual testing only after both consumption a registered listener, could you tell me how to write code I can do it at the same time to monitor MessageListenerOrderly and MessageListenerConcurrently
 
@ Bean
Public DefaultMQPushConsumer defaultConsumer () throws MQClientException {
The info (" create RocketMQ concurrent listener ");
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer (groupName);
Consumer. SetNamesrvAddr (namesrvAddr);
Consumer. SetConsumeThreadMin (consumeThreadMin);
Consumer. SetConsumeThreadMax (consumeThreadMax);
Consumer. SetConsumeMessageBatchMaxSize (consumeMessageBatchMaxSize);
//set to monitor
Consumer. RegisterMessageListener (consumeMsgListenerProcessor);

/* *
* set the consumer first start starts or queue queue head tail began to
* if it is not the first time you start, then according to the last position to continue consumption
*/
Consumer. SetConsumeFromWhere (ConsumeFromWhere. CONSUME_FROM_LAST_OFFSET);
/* *
* set the consumption model, cluster or broadcast, the default for cluster
*/
//consumer. SetMessageModel (MessageModel. CLUSTERING);

Try {
//set the consumers subscribe to topics and tag, if all subscribe to the topic of the tag, use the *,
String [] topicArr=switchable viewer. Split (";" );
For (String tag: topicArr) {
String [] tagArr=tag. The split (" ~ ");
Consumer. The subscribe (tagArr [0], tagArr [1]).
}
Consumer. The start ();
} the catch (MQClientException e) {
The log. The error (" consumer create failure!" );
}
Return the consumer;
}


At the same time two listeners registered
 
@ Bean
Public DefaultMQPushConsumer defaultConsumer () throws MQClientException {
The info (" create RocketMQ out-of-order listener ");
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer (groupName);
Consumer. SetNamesrvAddr (namesrvAddr);
Consumer. SetConsumeThreadMin (consumeThreadMin);
Consumer. SetConsumeThreadMax (consumeThreadMax);
Consumer. SetConsumeMessageBatchMaxSize (consumeMessageBatchMaxSize);
//set to monitor
Consumer. RegisterMessageListener (consumeMsgListenerProcessor);
Consumer. SetConsumeFromWhere (ConsumeFromWhere. CONSUME_FROM_LAST_OFFSET);

Try {
//set the consumers subscribe to topics and tag, if all subscribe to the topic of the tag, use the *,
String [] topicArr=switchable viewer. Split (";" );
For (String tag: topicArr) {
String [] tagArr=tag. The split (" ~ ");
Consumer. The subscribe (tagArr [0], tagArr [1]).
}
Consumer. The start ();
} the catch (MQClientException e) {
The log. The error (" consumer create failure!" );
}
The info (" consumer out-of-order create success groupName={}, switchable viewer={}, namesrvAddr={} ", groupName, switchable viewer, namesrvAddr);
The info (" create orderly RocketMQ listener ");
DefaultMQPushConsumer consumerOrderly=new DefaultMQPushConsumer (orderlyGroupName);
ConsumerOrderly. SetNamesrvAddr (namesrvAddr);
ConsumerOrderly. SetConsumeThreadMin (consumeThreadMin);
ConsumerOrderly. SetConsumeThreadMax (consumeThreadMax);
ConsumerOrderly. SetConsumeMessageBatchMaxSize (consumeMessageBatchMaxSize);
//set to monitor
ConsumerOrderly. RegisterMessageListener (consumeMsgOrderLyListenerProcessor);

ConsumerOrderly. SetConsumeFromWhere (ConsumeFromWhere. CONSUME_FROM_LAST_OFFSET);

Try {
//set the consumers subscribe to topics and tag, if all subscribe to the topic of the tag, use the *,
String [] topicArrOrderly=orderlyTopics. Split (";" );
For (String tag: topicArrOrderly) {
String [] tagArr=tag. The split (" ~ ");
ConsumerOrderly. Subscribe (tagArr [0], tagArr [1]).
}
ConsumerOrderly. Start ();
The info (" groupName consumer in order to create success={}, switchable viewer={}, namesrvAddr={} ", orderlyGroupName, orderlyTopics, namesrvAddr);
} the catch (MQClientException e) {
The log. The error (" consumer create failure! , {} ", e);
}
Return the consumer;
}
  • Related