RabbitTemplate and cachingConnectionFactory factory connection class
@ Configuration
@ EnableRabbit
Public class RabbitConfiguration {
The @autowired
RabbitProperties properties;
@ Bean
Public CachingConnectionFactory CachingConnectionFactory () {
CachingConnectionFactory factory=new CachingConnectionFactory ();
Factory. SetAddresses (properties. GetAddresses ());
Factory. SetUsername (properties. The getUsername ());
Factory. SetPassword (properties. GetPassword ());
Factory. SetVirtualHost (properties. GetVirtualHost ());
Factory. SetPublisherConfirms (true);
Return the factory.
}
@ Bean
Public RabbitAdmin RabbitAdmin () {
Return new RabbitAdmin (cachingConnectionFactory ());
}
@ Bean
Public RabbitTemplate RabbitTemplate (CachingConnectionFactory CachingConnectionFactory) {
RabbitTemplate template=new RabbitTemplate (cachingConnectionFactory);
The template. SetMessageConverter (new Jackson2JsonMessageConverter ());
Return the template.
}
}
The queue exchange listener class declarations
@ the Configuration (" syncRabbitConfiguration ")
@ EnableRabbit
Public class RabbitConfiguration {
Public static final String QUEUE="external_socket. Request";
Public static final String EXCHANGE="external_socket. Request";
The @autowired
CachingConnectionFactory CachingConnectionFactory;
The @autowired
RabbitAdmin RabbitAdmin;
The @autowired
SocketConsumer consumer;
@ Bean
Public FanoutExchange socketExchange () {
Return new FanoutExchange (EXCHANGE);
}
@ Bean
Public Queue socketQueue () {
Return the new Queue (Queue);
}
@ Bean
Public Binding socketBinding (Queue socketQueue, FanoutExchange socketExchange) {
Return BindingBuilder. Bind (socketQueue) to (socketExchange);
}
@ Bean
Public SimpleMessageListenerContainer smsContainer (Queue socketQueue) {
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer ();
Container. SetConnectionFactory (cachingConnectionFactory);
Container. SetAcknowledgeMode (AcknowledgeMode. NONE);
Container. SetExposeListenerChannel (true);
Container. SetMaxConcurrentConsumers (1);
Container. SetConcurrentConsumers (1);
Container. SetRabbitAdmin (rabbitAdmin);
Container. SetQueues (socketQueue);
Container. SetMessageListener (consumer);
Container. SetRecoveryInterval (MQConstant. Rabbit. DEFAULT_RECOVERY_INTERVAL);
Return the container;
}
}
One container. SetAcknowledgeMode (AcknowledgeMode. NONE);
Container. SetExposeListenerChannel (true);
Container. SetMaxConcurrentConsumers (1);
Container. SetConcurrentConsumers (1);
I have used this several parameters are adjusted, useless, below is the consumer code
@ Service
Public class SocketConsumer implements ChannelAwareMessageListener {
@ Override
Public void onMessage (Message Message, the Channel Channel) throws the Exception {
String body=null;
Try {
The body=new String (message. GetBody (), "utf-8");
SimpleDataDto dto=JSONObject. ParseObject (body, SimpleDataDto. Class);
System. The out. Println (dto. GetSyncType () + ": receive the message");
If (dto. GetSyncType ()==null) {
ListAllClient=our SessionManager. GetAllClient ();
For (SocketIOClient SocketIOClient: allClient) {
SocketIOClient. Adds (dto. GetDataType (), dto. GetData ());
}
} else {
SocketIOClient client=our SessionManager. GetClient (dto. GetSyncType ());
If (client!=null)
Client. Adds (dto. GetDataType (), dto. GetData ());
}
} the catch (UnsupportedEncodingException e) {
e.printStackTrace();
} the catch (Exception e) {
e.printStackTrace();
}
}
}
Sending code is a line of
rabbitTemplate. ConvertAndSend (RabbitConfiguration EXCHANGE, "", dataDto);
Through their own web management section is the same as the release effect
Here are the screenshots management
Exchange
The queue
Really can't find the way, please solve!!!!!!!!!!!!!!!!!!!!!!!!!!!!
CodePudding user response:
Excuse me, are you solved? I got a lot of documents or not, also solved the trouble for me to teach,CodePudding user response:
Broadcasting mode is under the same exchange all queue messages are received, your code only one queue, so only a message, two consumers to spend, at the same time, of course, only one consumer can receiveCodePudding user response: