Home > Back-end >  The rabbitmq fanout broadcasting mode cannot be sent to multiple consumers, a great god answers
The rabbitmq fanout broadcasting mode cannot be sent to multiple consumers, a great god answers

Time:09-19

Debugging A day, can't solve the rabbitmq broadcasting mode, two consumers, A received A consumer, A consumer receives, B cannot be received at the same time, must be the same queue, strives for the great god answers, the code is as follows:


RabbitTemplate and cachingConnectionFactory factory connection class
 @ Configuration 
@ EnableRabbit
Public class RabbitConfiguration {

The @autowired
RabbitProperties properties;


@ Bean
Public CachingConnectionFactory CachingConnectionFactory () {
CachingConnectionFactory factory=new CachingConnectionFactory ();
Factory. SetAddresses (properties. GetAddresses ());
Factory. SetUsername (properties. The getUsername ());
Factory. SetPassword (properties. GetPassword ());
Factory. SetVirtualHost (properties. GetVirtualHost ());
Factory. SetPublisherConfirms (true);
Return the factory.
}



@ Bean
Public RabbitAdmin RabbitAdmin () {
Return new RabbitAdmin (cachingConnectionFactory ());
}


@ Bean
Public RabbitTemplate RabbitTemplate (CachingConnectionFactory CachingConnectionFactory) {
RabbitTemplate template=new RabbitTemplate (cachingConnectionFactory);
The template. SetMessageConverter (new Jackson2JsonMessageConverter ());
Return the template.
}


}



The queue exchange listener class declarations
 @ the Configuration (" syncRabbitConfiguration ") 
@ EnableRabbit
Public class RabbitConfiguration {


Public static final String QUEUE="external_socket. Request";
Public static final String EXCHANGE="external_socket. Request";
The @autowired
CachingConnectionFactory CachingConnectionFactory;

The @autowired
RabbitAdmin RabbitAdmin;

The @autowired
SocketConsumer consumer;
@ Bean
Public FanoutExchange socketExchange () {
Return new FanoutExchange (EXCHANGE);
}
@ Bean
Public Queue socketQueue () {
Return the new Queue (Queue);
}

@ Bean
Public Binding socketBinding (Queue socketQueue, FanoutExchange socketExchange) {
Return BindingBuilder. Bind (socketQueue) to (socketExchange);
}

@ Bean
Public SimpleMessageListenerContainer smsContainer (Queue socketQueue) {
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer ();
Container. SetConnectionFactory (cachingConnectionFactory);
Container. SetAcknowledgeMode (AcknowledgeMode. NONE);
Container. SetExposeListenerChannel (true);
Container. SetMaxConcurrentConsumers (1);
Container. SetConcurrentConsumers (1);
Container. SetRabbitAdmin (rabbitAdmin);
Container. SetQueues (socketQueue);
Container. SetMessageListener (consumer);
Container. SetRecoveryInterval (MQConstant. Rabbit. DEFAULT_RECOVERY_INTERVAL);
Return the container;
}

}


One container. SetAcknowledgeMode (AcknowledgeMode. NONE);
Container. SetExposeListenerChannel (true);
Container. SetMaxConcurrentConsumers (1);
Container. SetConcurrentConsumers (1);
I have used this several parameters are adjusted, useless, below is the consumer code

 @ Service 
Public class SocketConsumer implements ChannelAwareMessageListener {

@ Override
Public void onMessage (Message Message, the Channel Channel) throws the Exception {
String body=null;
Try {
The body=new String (message. GetBody (), "utf-8");
SimpleDataDto dto=JSONObject. ParseObject (body, SimpleDataDto. Class);
System. The out. Println (dto. GetSyncType () + ": receive the message");
If (dto. GetSyncType ()==null) {
List AllClient=our SessionManager. GetAllClient ();
For (SocketIOClient SocketIOClient: allClient) {
SocketIOClient. Adds (dto. GetDataType (), dto. GetData ());
}
} else {
SocketIOClient client=our SessionManager. GetClient (dto. GetSyncType ());
If (client!=null)
Client. Adds (dto. GetDataType (), dto. GetData ());
}
} the catch (UnsupportedEncodingException e) {
e.printStackTrace();
} the catch (Exception e) {
e.printStackTrace();
}

}
}


Sending code is a line of
 rabbitTemplate. ConvertAndSend (RabbitConfiguration EXCHANGE, "", dataDto); 

Through their own web management section is the same as the release effect
Here are the screenshots management

Exchange


The queue



Really can't find the way, please solve!!!!!!!!!!!!!!!!!!!!!!!!!!!!

CodePudding user response:

Excuse me, are you solved? I got a lot of documents or not, also solved the trouble for me to teach,

CodePudding user response:

Broadcasting mode is under the same exchange all queue messages are received, your code only one queue, so only a message, two consumers to spend, at the same time, of course, only one consumer can receive

CodePudding user response:

reference 4 floor Limitedapm response:
broadcasting mode is under the same exchange all queue messages are received, your code is only one queue, so only a message that both consumers and consumption at the same time, of course, only one consumer can receive

Yes, different consumers want to use a different queue name binding, can let the rabbitmq generate its own name, also can be customized,
Use activemq, rabbitmq this concept does not adapt, consumers had to declare message queue way to consumption,

CodePudding user response:

This problem is bothering me for a long time, my way is:
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related