Code:
The configuration file:
@ Configuration
Public class RabbitConfig {
Public static final String EXCHANGE="EXCHANGE";
Public static final String ROUTINGKEY="will. The message";
The @autowired
Receiver Receiver;
@ Bean
Public ConnectionFactory ConnectionFactory () {
CachingConnectionFactory connectionFactory=new CachingConnectionFactory ();
10.120.1.148 connectionFactory. SetAddresses (" ");
Zhouyou connectionFactory. SetUsername (" ");
ConnectionFactory. SetPassword (" zhouyou @ 163 ");
ConnectionFactory. SetVirtualHost ("/");
connectionFactory. SetPublisherConfirms (true);//must be set
Return the connectionFactory;
}
@ Bean
@ the Scope (ConfigurableBeanFactory SCOPE_PROTOTYPE)
//must be prototype type
Public RabbitTemplate RabbitTemplate () {
RabbitTemplate template=new RabbitTemplate (connectionFactory ());
Return the template.
}
@ Bean
Public DirectExchange defaultExchange () {
Return new DirectExchange (EXCHANGE);
}
@ Bean
Public Queue Queue () {
Will. Return the new Queue (the "message", true, false, false, null);//queue lasting
}
@ Bean
Public Binding Binding () {
Return BindingBuilder. Bind (queue ()) to (defaultExchange ()), with (RabbitConfig. ROUTINGKEY);
}
@ Bean
Public SimpleMessageListenerContainer messageContainer () {
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer (connectionFactory ());
Container. SetQueues (queue ());
Container. SetExposeListenerChannel (true);
Container. SetMaxConcurrentConsumers (10);
Container. SetConcurrentConsumers (1);
container. SetAcknowledgeMode (AcknowledgeMode. MANUAL);//set the confirmation model manual
Container. SetMessageListener (receiver);
Return the container;
}
}
Consumer receiver:
@ Service
Public class Receiver implements ChannelAwareMessageListener {
@ Override
Public void onMessage (Message Message, the Channel Channel) throws the Exception {
Byte [] body=message. GetBody ();
System. The out. Println (" the receive MSG: "+ new String (body));
//channel. BasicAck (message. GetMessageProperties () getDeliveryTag (), false);
}
}
The message sender sender: @ Service
Public class Sender implements RabbitTemplate. ConfirmCallback {
Private RabbitTemplate RabbitTemplate;
/* *
* constructor injection
*/
The @autowired
Public Sender (RabbitTemplate RabbitTemplate) {
Enclosing rabbitTemplate=rabbitTemplate;
RabbitTemplate. SetConfirmCallback (this);//rabbitTemplate if for singleton, the callback is finally set the content of the
}
Public void sendMsg (String content) {
CorrelationData correlationId=new CorrelationData (UUID. RandomUUID (), toString ());
RabbitTemplate. ConvertAndSend (RabbitConfig. EXCHANGE, RabbitConfig ROUTINGKEY, content, correlationId);
}
/* *
* callback
*/
@ Override
Public void confirm (CorrelationData CorrelationData, Boolean ack, String cause) {
System. The out. Println (" callback id: "+ correlationData);
if (! Ack) {
System. The out. Println (" news consumption success ");
} else {
System. The out. Println (" news consumption failed: "+ cause);
}
}
}
Test method (s)
The @autowired
Private Sender Sender;
@ RequestMapping ("/getUserInfo ")
@ ResponseBody
Public void getUserInfo () {
Sender. SendMsg (" will. The message ");
}
Web access address:
Localhost: 8080/getUserInfo,
Now the question is: channel. The Receiver inside whether call basicAck (message. GetMessageProperties () getDeliveryTag (), false); This method, the final program will run into the Sender confirm method (i.e. public void confirm (CorrelationData CorrelationData, Boolean ack, String cause) method, and parameters of the ack to true),
Great god told, is I understand error correction setting, or how to return a responsibility? Thank you very much!!!!!!!!!!!!!!!
CodePudding user response:
You understand is wrong, actually this confirmation mechanism are producers and the rabbitmq server mechanism, this just shows whether the rabbitmq received the message you sendCodePudding user response:
You can refer to this article at https://blog.csdn.net/qq315737546/article/details/54176560