Home > OS >  ActiveMQ producers to send data for the second time start blocking problem (CMS)
ActiveMQ producers to send data for the second time start blocking problem (CMS)

Time:10-04

ActiveMQ producers to send data for the second time to start blocking, if at first you don't block, no boot consumers, type: topic,

Call way:
Lz_producer * topic_producer_=new lz_producer (" failover://(TCP://127.0.0.1:61616) ", "test. The policy", true, false);

The header file
 class lz_producer {
Private:
The Session * session_;
The Connection * connection_;
Destination destination_ *;
MessageProducer * producer_;

STD: : string dest_uri_;
STD: : string broker_uri_;

Bool use_topic_;
Bool client_ack_;
Private:
Void the cleanup (void);
Public:
Lz_producer (const STD: : string& Broker_uri, const STD: : string& Dest_uri,
Bool use_topic=false, bool client_ack=false);
~ lz_producer (void);
Public:
Void, initializer (void);
Bool send_message (const STD: : string& The content);
Bool send_message (const STD: : string& The content of const int priority);
};


The CPP file
 lz_producer: : lz_producer (const STD: : string & amp; Broker_uri, const STD: : string & amp; Dest_uri, 
Bool use_topic, bool client_ack) {
Session_=NULL;
Producer_=NULL;
Connection_=NULL;
Destination_=NULL;

Use_topic_=use_topic;
Client_ack_=client_ack;

Broker_uri_. Assign (broker_uri);
Dest_uri_. Assign (dest_uri);

Activemq: : library: : ActiveMQCPP: : initializeLibrary ();

The LOG (INFO) & lt; <"Lz_producer create," & lt; }

Vanli: : lz_producer: : ~ lz_producer (void) {
The cleanup ();
Activemq: : library: : ActiveMQCPP: : shutdownLibrary ();
}

Void lz_producer: :, initializer (void) {
The LOG (INFO) & lt; <"Lz_producer: :, initializer, & lt;"
Try {
//Create a ConnectionFact
STD: : auto_ptr & lt; ActiveMQConnectionFactory> ConnectionFactory (new ActiveMQConnectionFactory (broker_uri_));
ConnectionFactory - & gt; SetUseAsyncSend (true);
Connection_=connectionFactory - & gt; The createConnection ();
Connection_ - & gt; Start ().

If (client_ack_) {
The LOG (INFO) & lt; <"The Session: : CLIENT_ACKNOWLEDGE";
Session_=connection_ - & gt; CreateSession (Session: : CLIENT_ACKNOWLEDGE);
} else {
//the Session: : AUTO_ACKNOWLEDGE
//the Session: : SESSION_TRANSACTED
The LOG (INFO) & lt; <"The Session: : SESSION_TRANSACTED";
Session_=connection_ - & gt; CreateSession (Session: : SESSION_TRANSACTED);
}
If (use_topic_) {
Destination_=session_ - & gt; CreateTopic (dest_uri_);
} else {
Destination_=session_ - & gt; CreateQueue (dest_uri_);
}
Producer_=session_ - & gt; CreateProducer (destination_);
Producer_ - & gt; SetDeliveryMode (DeliveryMode: : PERSISTENT);
} the catch (CMSException& E) {
The LOG (ERROR) & lt; }
}

Void lz_producer: : cleanup (void) {
Try {
If (destination_!=NULL) {
The delete destination_;
}
} the catch (CMSException & amp; E) {
The LOG (ERROR) & lt; }
Destination_=NULL;
Try {
If (producer_!=NULL) {
The delete producer_;
}
} the catch (CMSException & amp; E) {
The LOG (ERROR) & lt; }
Producer_=NULL;

Try {
If (session_!=NULL) {
Session_ - & gt; Close ();
}
If (connection_!=NULL) {
Connection_ - & gt; Close ();
}
} the catch (CMSException & amp; E) {
The LOG (ERROR) & lt; }

Try {
If (session_!=NULL) {
The delete session_;
}
} the catch (CMSException & amp; E) {
The LOG (ERROR) & lt; }
Session_=NULL;

Try {
If (connection_!=NULL) {
The delete connection_; }
} the catch (CMSException & amp; E) {
The LOG (ERROR) & lt; }
Connection_=NULL;
}

Bool lz_producer: : send_message (const STD: : string & amp; The content) {
Bool ret=false;
Try {
STD: : auto_ptr & lt; CMS: : TextMessage> TextMessage (session_ - & gt; CreateTextMessage ());
TextMessage - & gt; SetText (content);
Producer_ - & gt; Send (textMessage. The get ());
Session_ - & gt; Commit ();
Ret=true;
} the catch (CMSException & amp; E) {
The LOG (ERROR) & lt; <"Error: send_message 1," & lt; }
return ret;
}
  • Related