Home > other >  ActiveMQ communication//why HelloWorldConsumer don't get the message?
ActiveMQ communication//why HelloWorldConsumer don't get the message?

Time:09-26

 
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
You * The ASF licenses this file under The Apache License, Version 2.0
* (the "License"); You may not use this file except in the compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS is" BASIS,
* WITHOUT WARRANTIES OR the CONDITIONS OF ANY KIND, either express OR implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//START the SNIPPET: demo
# include & lt; Activemq/library/ActiveMQCPP. H>
# include & lt; Decaf/lang/Thread. H>
# include & lt; Decaf/lang/Runnable. H>
# include & lt; Decaf/util/concurrent/CountDownLatch. H>
# include & lt; Decaf/lang/Integer. H>
# include & lt; Decaf/lang/Long. H>
# include & lt; Decaf/lang/System. H>
# include & lt; Activemq/core/ActiveMQConnectionFactory. H>
# include & lt; Activemq/util/Config. H>
# include & lt; CMS/Connection. H>
# include & lt; CMS/Session. H>
# include & lt; CMS/TextMessage. H>
# include & lt; CMS/BytesMessage. H>
# include & lt; CMS/MapMessage. H>
# include & lt; CMS/ExceptionListener. H>
# include & lt; CMS/MessageListener. H>
# include & lt; Stdlib. H>
# include & lt; Stdio. H>
# include & lt; iostream>
# include & lt; Memory>

Using the namespace activemq: : core;
Using the namespace decaf: : util: : concurrent;
Using the namespace decaf: : util;
Using the namespace decaf: : lang;
Using the namespace CMS;
using namespace std;

The class HelloWorldProducer: public Runnable {
Private:

The Connection * Connection;
The Session * Session;
Destination * Destination;
MessageProducer * producer;
Int numMessages;
Bool useTopic;
Bool sessionTransacted;
STD: : string brokerURI;

Private:

HelloWorldProducer (const HelloWorldProducer&);
HelloWorldProducer& Operator=(const HelloWorldProducer&);

Public:

HelloWorldProducer (const STD: : string& BrokerURI, int numMessages, bool useTopic=false, bool sessionTransacted=false) :
Connection (NULL),
The session (NULL),
Destination (NULL),
Producer (NULL),
NumMessages (numMessages),
UseTopic (useTopic),
SessionTransacted (sessionTransacted),
BrokerURI (brokerURI) {
}

Virtual ~ HelloWorldProducer () {
The cleanup ();
}

Void the close () {
This - & gt; The cleanup ();
}

Virtual void the run () {

Try {

//Create a ConnectionFactory
Auto_ptr & lt; ConnectionFactory> ConnectionFactory (
ConnectionFactory: : createCMSConnectionFactory (brokerURI));

//Create a Connection
The connection=connectionFactory - & gt; The createConnection ();
Connection - & gt; start();

//Create a Session
If (this - & gt; SessionTransacted) {
The session=connection - & gt; CreateSession (Session: : SESSION_TRANSACTED);
} else {
The session=connection - & gt; CreateSession (Session: : AUTO_ACKNOWLEDGE);
}

//Create the destination (the Topic or Queue)
Destination=session - & gt; CreateQueue (" TEST. The FOO ");

//Create a MessageProducer from the Session to the Topic or Queue
Producer=session - & gt; CreateProducer (destination);
Producer - & gt; SetDeliveryMode (DeliveryMode: : NON_PERSISTENT);

//Create the Thread Id String
String threadIdStr=Long: : toString (Thread: : currentThread () - & gt; The getId ());

//Create a messages
String text=(string) "Hello world! The from thread "+ threadIdStr;

For (int x=0; Ix & lt; NumMessages; + + ix) {
STD: : auto_ptr & lt; TextMessage> The message (the session - & gt; CreateTextMessage (text));
The message - & gt; SetIntProperty (" Integer, "ix);
Printf (" Sent message # % d from thread % s \ n ", ix + 1, threadIdStr. C_str ());
Producer - & gt; Send (message. The get ());
}

} the catch (CMSException& E) {
e.printStackTrace();
}
}

Private:

Void the cleanup () {

If (connection!=NULL) {
Try {
Connection - & gt; close();
} the catch (CMS: : CMSException& The ex) {
ex.printStackTrace();
}
}

//Destroy resources.
Try {
The delete destination;
Destination=NULL;
Delete the producer;
Producer=NULL;
Delete the session.
The session=NULL;
Delete the connection;
The connection=NULL;
} the catch (CMSException& E) {
e.printStackTrace();
}
}
};

The class HelloWorldConsumer: public ExceptionListener,
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related