ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

dpatel
        ActiveMQCPP version: 3.7.1
AcitveMQBroker version: 5.10.0

Here is a simple example. The code includes both consumer and producer


// START SNIPPET: demo

#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>

#include <decaf/util/Random.h>

using namespace activemq::core;
using namespace decaf::util::concurrent;
using namespace decaf::util;
using namespace decaf::lang;
using namespace cms;
using namespace std;

#define  QUEUE_NAME    "eventQueue"
#define NAME_BYTE_LEN        16

class HelloWorldProducer : public ExceptionListener,
        public MessageListener,
        public Runnable {
private:
        CountDownLatch latch;
        CountDownLatch doneLatch;
        Connection* connection;
        Session* session;
        Destination* destination;
        MessageProducer* producer;
        int numMessages;
        bool useTopic;
        bool sessionTransacted;
        std::string brokerURI;
        bool bReciveMessage;
        long waitMillis;

private:

        HelloWorldProducer(const HelloWorldProducer&);
        HelloWorldProducer& operator=(const HelloWorldProducer&);

public:

        HelloWorldProducer(const std::string& brokerURI, int numMessages,
bool
useTopic = false, bool sessionTransacted = false,
                long waitMillis = 3000) :
                latch(1),
                doneLatch(numMessages),
                connection(NULL),
                session(NULL),
                destination(NULL),
                producer(NULL),
                numMessages(numMessages),
                useTopic(useTopic),
                sessionTransacted(sessionTransacted),
                brokerURI(brokerURI),
                bReciveMessage(false),
                waitMillis(waitMillis)
        { }

        virtual ~HelloWorldProducer() {
                cleanup();
        }

        void close() {
                this->cleanup();
        }

        void waitUntilReady() {
                latch.await();
        }

        virtual void run() {

                try {

                        // Create a ConnectionFactory
                        auto_ptr<ConnectionFactory> connectionFactory(
                               
ConnectionFactory::createCMSConnectionFactory(brokerURI));

                        // Create a Connection
                        connection = connectionFactory->createConnection();
                        connection->start();

                        // Create a Session
                        if (this->sessionTransacted) {
                                session =
connection->createSession(Session::SESSION_TRANSACTED);
                        }
                        else {
                                session =
connection->createSession(Session::AUTO_ACKNOWLEDGE);
                        }

                        session = connection->createSession();
                        // Create the destination (Topic or Queue)
                        if (useTopic) {
                                destination =
session->createTopic(QUEUE_NAME);
                        }
                        else {
                                destination =
session->createQueue(QUEUE_NAME);
                        }

                        // Create a MessageProducer from the Session to the
Topic or Queue
                        producer = session->createProducer(destination);
                       
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

                        // Create the Thread Id String
                        string threadIdStr =
Long::toString(Thread::currentThread()->getId());

                        // Create a messages
                        string text = (string) "Hello world! from thread " +
threadIdStr;

                        for (int ix = 0; ix < numMessages; ++ix) {
                                std::auto_ptr<TextMessage>
message(session->createTextMessage(text));

                                //????...
                                std::auto_ptr<Destination>
tempDest(session->createTemporaryQueue());

                                //cms::Destination
tempDest=session->createTemporaryTopic() ;
                                MessageConsumer * responseConsumer =
session->createConsumer(tempDest.get());
                               
responseConsumer->setMessageListener(this);//??...


                                message->setCMSReplyTo(tempDest.get());
                                Random random;
                                char buffer[NAME_BYTE_LEN] = { 0 };
                                random.nextBytes((unsigned char *)buffer,
NAME_BYTE_LEN);
                                string correlationId = "";
                                for (int i = 0; i < NAME_BYTE_LEN; ++i)
                                {
                                        char ch[NAME_BYTE_LEN * 2] = { 0 };
                                        sprintf(ch, "%02X", (unsigned
char)buffer[i]);
                                        string str(ch);

                                        correlationId += str;
                                }

                                message->setCMSCorrelationID(correlationId);

                                message->setIntProperty("Integer", ix);
                                printf("Producer Sent message #%d from
thread %s\n", ix + 1,
threadIdStr.c_str());
                                producer->send(message.get());

                                // Indicate we are ready for messages.
                                latch.countDown();

                                // Wait while asynchronous messages come in.
                                doneLatch.await(waitMillis);

                        }
                }
                catch (CMSException& e) {
                        printf("Producer run() CMSException \n");
                        // Indicate we are ready for messages.
                        latch.countDown();
                        e.printStackTrace();
                }


        }


        // Called from the Producer since this class is a registered
MessageListener.
        virtual void onMessage(const Message* message) {

                static int count = 0;

                try {
                        count++;
                        const TextMessage* textMessage = dynamic_cast<const
TextMessage*>
(message);
                        //ActiveMQMessageTransformation
                        //std::auto_ptr<TextMessage>
responsemessage(session->createTextMessage());
               
//responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());
                        //responsemessage->getCMSReplyTo()

                        string text = "";

                        if (textMessage != NULL) {
                                text = textMessage->getText();
                        }
                        else {
                                text = "NOT A TEXTMESSAGE!";
                        }

                        printf("Producer Message #%d Received: %s\n", count,
text.c_str());


                        //producer.send

                }
                catch (CMSException& e) {
                        printf("Producer onMessage() CMSException \n");
                        e.printStackTrace();
                }

                // Commit all messages.
                if (this->sessionTransacted) {
                        session->commit();
                }

                // No matter what, tag the count down latch until done.
                doneLatch.countDown();
        }

        // If something bad happens you see it here as this class is also
been
        // registered as an ExceptionListener with the connection.
        virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
                printf("Producer onException() CMS Exception occurred.
Shutting down
client. \n");
                ex.printStackTrace();
                exit(1);
        }


private:

        void cleanup() {

                if (connection != NULL) {
                        try {
                                connection->close();
                        }
                        catch (cms::CMSException& ex) {
                                ex.printStackTrace();
                        }
                }

                // Destroy resources.
                try {
                        delete destination;
                        destination = NULL;
                        delete producer;
                        producer = NULL;
                        delete session;
                        session = NULL;
                        delete connection;
                        connection = NULL;
                }
                catch (CMSException& e) {
                        e.printStackTrace();
                }
        }
};

class HelloWorldConsumer : public ExceptionListener,
        public MessageListener,
        public Runnable {

private:

        CountDownLatch latch;
        CountDownLatch doneLatch;
        Connection* connection;
        Session* session;
        Destination* destination;
        MessageConsumer* consumer;
        MessageProducer *producer;
        long waitMillis;
        bool useTopic;
        bool sessionTransacted;
        std::string brokerURI;

private:

        HelloWorldConsumer(const HelloWorldConsumer&);
        HelloWorldConsumer& operator=(const HelloWorldConsumer&);

public:

        HelloWorldConsumer(const std::string& brokerURI, int numMessages,
bool
useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :
                latch(1),
                doneLatch(numMessages),
                connection(NULL),
                session(NULL),
                destination(NULL),
                consumer(NULL),
                producer(NULL),
                waitMillis(waitMillis),
                useTopic(useTopic),
                sessionTransacted(sessionTransacted),
                brokerURI(brokerURI) {
        }

        virtual ~HelloWorldConsumer() {
                cleanup();
        }

        void close() {
                this->cleanup();
        }

        void waitUntilReady() {
                latch.await();
        }

        virtual void run() {

                try {

                        // Create a ConnectionFactory
                        auto_ptr<ConnectionFactory> connectionFactory(
                               
ConnectionFactory::createCMSConnectionFactory(brokerURI));

                        // Create a Connection
                        connection = connectionFactory->createConnection();
                        connection->start();
                        connection->setExceptionListener(this);

                        // Create a Session
                        if (this->sessionTransacted == true) {
                                session =
connection->createSession(Session::SESSION_TRANSACTED);
                        }
                        else {
                                session =
connection->createSession(Session::AUTO_ACKNOWLEDGE);
                        }

                        // Create the destination (Topic or Queue)
                        if (useTopic) {
                                destination =
session->createTopic(QUEUE_NAME);
                        }
                        else {
                                destination =
session->createQueue(QUEUE_NAME);
                        }

                        producer = session->createProducer();
                       
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

                        // Create a MessageConsumer from the Session to the
Topic or Queue
                        consumer = session->createConsumer(destination);

                        consumer->setMessageListener(this);

                        std::cout.flush();
                        std::cerr.flush();

                        // Indicate we are ready for messages.
                        latch.countDown();

                        // Wait while asynchronous messages come in.
                        doneLatch.await();

                }
                catch (CMSException& e) {
                        printf("Consumer onException() CMS Exception
occurred.  Shutting down
client. \n");
                        // Indicate we are ready for messages.
                        latch.countDown();
                        e.printStackTrace();
                }
        }

        // Called from the consumer since this class is a registered
MessageListener.
        virtual void onMessage(const Message* message) {

                static int count = 0;

                try {
                        count++;


                        // Create the Thread Id String
                        string threadIdStr =
Long::toString(Thread::currentThread()->getId());

                        static bool bPrintf = true;
                        if (bPrintf)
                        {
                                bPrintf = false;
                                printf("consumer Message threadid: %s\n",
threadIdStr.c_str());
                        }

                        string strReply = "consumer return  xxx,ThreadID=" +
threadIdStr;
                        const TextMessage* textMessage = dynamic_cast<const
TextMessage*>
(message);

                        if (NULL == textMessage)
                        {
                                printf("NULL==textMessage %s",
message->getCMSType().c_str());


                                //const cms::MapMessage* mapMsg =
dynamic_cast<const
cms::MapMessage*>(message);
                                //if(mapMsg)
                                //{
                                //    
                                //    std::vector<std::string> elements =
mapMsg->getMapNames();
                                //    std::vector<std::string>::iterator
iter = elements.begin();
                                //    for(; iter != elements.end() ; ++iter)
                                //    {
                                //        std::string key = *iter;
                                //        cms::Message::ValueType
elementType =
mapMsg->getValueType(key);
                                //        string strxxx;
                                //        int cc=0;
                                //        switch(elementType) {
                                //    case cms::Message::BOOLEAN_TYPE:
                                //        //msg->setBoolean(key,
mapMsg->getBoolean(key));
                                //        break;
                                //    case cms::Message::BYTE_TYPE:
                                //        //msg->setByte(key,
mapMsg->getByte(key));
                                //        break;
                                //    case cms::Message::BYTE_ARRAY_TYPE:
                                //        //msg->setBytes(key,
mapMsg->getBytes(key));
                                //        break;
                                //    case cms::Message::CHAR_TYPE:
                                //        //msg->setChar(key,
mapMsg->getChar(key));
                                //        break;
                                //    case cms::Message::SHORT_TYPE:
                                //        //msg->setShort(key,
mapMsg->getShort(key));
                                //        break;
                                //    case cms::Message::INTEGER_TYPE:
                                //        //msg->setInt(key,
mapMsg->getInt(key));
                                //        break;
                                //    case cms::Message::LONG_TYPE:
                                //        //msg->setLong(key,
mapMsg->getLong(key));
                                //        break;
                                //    case cms::Message::FLOAT_TYPE:
                                //        //msg->setFloat(key,
mapMsg->getFloat(key));
                                //        break;
                                //    case cms::Message::DOUBLE_TYPE:
                                //        //msg->setDouble(key,
mapMsg->getDouble(key));
                                //        break;
                                //    case cms::Message::STRING_TYPE:
                                //        //msg->setString(key,
mapMsg->getString(key));
                                //        strxxx=mapMsg->getString(key);
                                //        cc=1;
                                //        break;
                                //    default:
                                //        break;
                                //        }
                                //    }

                                //}

                                return;
                        }

                        std::auto_ptr<TextMessage>
responsemessage(session->createTextMessage(strReply));
                       
responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());


                        string text = "";

                        if (textMessage != NULL) {
                                text = textMessage->getText();
                        }
                        else {
                                text = "NOT A TEXTMESSAGE!";
                        }

                        int nProPerty =
textMessage->getIntProperty("Integer");
                        printf("consumer Message #%d Received:
%s,nProPerty[%d]\n", count,
text.c_str(), nProPerty);


                        const cms::Destination* destSend =
textMessage->getCMSReplyTo();
                        if (destSend)
                        {
                                this->producer->send(destSend,
responsemessage.get());

                                printf("consumer Message #%d send: %s\n",
count, strReply.c_str());
                        }


                }
                catch (CMSException& e) {
                        printf("Consumer onMessage() CMS Exception occurred.
Shutting down
client. \n");
                        e.printStackTrace();
                }

                // Commit all messages.
                if (this->sessionTransacted) {
                        session->commit();
                }

                // No matter what, tag the count down latch until done.
                //doneLatch.countDown();
        }

        // If something bad happens you see it here as this class is also
been
        // registered as an ExceptionListener with the connection.
        virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
                printf("Consumer onException() CMS Exception occurred.
Shutting down
client. \n");
                //printf("CMS Exception occurred.  Shutting down
client.\n");
                ex.printStackTrace();
                exit(1);
        }

private:

        void cleanup() {
                if (connection != NULL) {
                        try {
                                connection->close();
                        }
                        catch (cms::CMSException& ex) {
                                ex.printStackTrace();
                        }
                }

                // Destroy resources.
                try {
                        delete destination;
                        destination = NULL;
                        delete consumer;
                        consumer = NULL;
                        delete session;
                        session = NULL;
                        delete connection;
                        connection = NULL;
                }
                catch (CMSException& e) {
                        e.printStackTrace();
                }
        }
};

int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

        //if(argc<2)
        //{
        //    printf("argc<2\r\n");
        //    return 0;
        //}

        activemq::library::ActiveMQCPP::initializeLibrary();
        {
                std::cout <<
"=====================================================\n";
                std::cout << "Starting the example:" << std::endl;
                std::cout <<
"-----------------------------------------------------\n";


                // Set the URI to point to the IP Address of your broker.
                // add any optional params to the url to enable things like
                // tightMarshalling or tcp logging etc.  See the CMS web
site for
                // a full list of configuration options.
                //
                //  http://activemq.apache.org/cms/
                //
                // Wire Format Options:
                // =========================
                // Use either stomp or openwire, the default ports are
different for each
                //
                // Examples:
                //    tcp://127.0.0.1:61616                      default to
openwire
                //    tcp://127.0.0.1:61616?wireFormat=openwire  same as
above
                //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp
instead
                //
                // SSL:
                // =========================
                // To use SSL you need to specify the location of the
trusted Root CA or
the
                // certificate for the broker you want to connect to.  Using
the Root CA
allows
                // you to use failover with multiple servers all using
certificates signed
by
                // the trusted root.  If using client authentication you
also need to
specify
                // the location of the client Certificate.
                //
                //     System::setProperty( "decaf.net.ssl.keyStore",
"<path>/client.pem"
);
                //     System::setProperty(
"decaf.net.ssl.keyStorePassword", "password"
);
                //     System::setProperty( "decaf.net.ssl.trustStore",
"<path>/rootCA.pem" );
                //
                // The you just specify the ssl transport in the URI, for
example:
                //
                //     ssl://localhost:61617
                //
                std::string brokerURI =
"tcp://127.0.0.1:61616?jms.watchTopicAdvisories=false";
                       
               
//============================================================
                // set to true to use topics instead of queues
                // Note in the code above that this causes createTopic or
                // createQueue to be used in both consumer an producer.
               
//============================================================
                bool useTopics = false;
                bool sessionTransacted = true;
                int numMessages = 1;
                bool useConsumer = true;
                bool useProducer = true;

                //int nSet=atoi(argv[1]);
                //if(1==nSet)
                //{
                //#define USE_COMSUMER


                //}
                //else
                //{
                //#define USE_PRODUCER

                //
                //}



                long long startTime = System::currentTimeMillis();

#ifdef USE_PRODUCER
                printf("?? USE_PRODUCER \r\n");

                int numProducerMessages = 30;
                int nThreadNumber = 10;
                vector<HelloWorldProducer *> vHelloWorldProducer;
                for (int i = 0; i < nThreadNumber; ++i)
                {
                        HelloWorldProducer * producerTemp = new
HelloWorldProducer(brokerURI,
numProducerMessages, useTopics);
                        vHelloWorldProducer.push_back(producerTemp);
                }

#endif

#ifdef USE_COMSUMER
                printf("?? USE_COMSUMER \r\n");
                HelloWorldConsumer consumer(brokerURI, numMessages,
useTopics,
sessionTransacted);
                // Start the consumer thread.
                Thread consumerThread(&consumer);
                consumerThread.start();

                // Wait for the consumer to indicate that its ready to go.
                consumer.waitUntilReady();

#endif




#ifdef USE_PRODUCER
                // Start the producer thread.

                vector<Thread *> vThread;
                for (int i = 0; i < nThreadNumber; ++i)
                {
                        HelloWorldProducer & ProducerTemp =
*vHelloWorldProducer[i];
                        Thread * threadTemp = new Thread(&ProducerTemp);
                        vThread.push_back(threadTemp);
                        threadTemp->start();
                        ProducerTemp.waitUntilReady();

                }

                for (size_t i = 0; i < vThread.size(); ++i)
                {
                        Thread * threadTemp = vThread[i];
                        //threadTemp->join();
                }
                while (1)
                {
                        Thread::sleep(10);
                }

                //Thread producerThread1(&producer1);
                //producerThread1.start();
                //producer1.waitUntilReady();

                //Thread producerThread2(&producer2);
                //producerThread2.start();
                //producer2.waitUntilReady();

                //Thread producerThread3(&producer3);
                //producerThread3.start();
                //producer3.waitUntilReady();
#endif




#ifdef USE_PRODUCER
                // Wait for the threads to complete.
                //producerThread1.join();
                //producerThread2.join();
                //producerThread3.join();
#endif

#ifdef USE_COMSUMER
                consumerThread.join();
#endif

                long long endTime = System::currentTimeMillis();
                double totalTime = (double)(endTime - startTime) / 1000.0;

#ifdef USE_PRODUCER
                //producer1.close();
                //producer2.close();
                //producer3.close();

                for (size_t i = 0; i < vHelloWorldProducer.size(); ++i)
                {
                        HelloWorldProducer * ProducerTemp =
vHelloWorldProducer[i];
                        ProducerTemp->close();

                        if (ProducerTemp)
                        {
                                delete ProducerTemp;
                                ProducerTemp = NULL;
                        }
                }

#endif
#ifdef USE_COMSUMER
                consumer.close();
#endif




                std::cout << "Time to completion = " << totalTime << "
seconds." <<
std::endl;
                std::cout <<
"-----------------------------------------------------\n";
                std::cout << "Finished with the example." << std::endl;
                std::cout <<
"=====================================================\n";

        }
        activemq::library::ActiveMQCPP::shutdownLibrary();


        return 0;
}

When I run the above example producer and consumer following happens:

1. Producer is able to put the message on the queue.
2. Consumer is able to retrieve the message from the queues.
3. When consumer tries to send response back using replyTo desitnation, the
send fails with error message listed above.

On the broker and consumer/producer I have advisorySupport turned OFF. When
I turn them on this work fine.

What I would like to know:
1. How can I make the error go away but still have advisorySupport off.

Thanks a lot for your help.

P.S-- Attaching my activemq.xml.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

Tim Bain
Is this in a single broker or a network of brokers? If the latter, does it
work for a single broker without advisory messages enabled?

Also, your activemq.xml didn't make it.

Tim

On Sun, Sep 16, 2018, 2:06 AM dpatel <[hidden email]> wrote:

>         ActiveMQCPP version: 3.7.1
> AcitveMQBroker version: 5.10.0
>
> Here is a simple example. The code includes both consumer and producer
>
>
> // START SNIPPET: demo
>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <decaf/lang/Integer.h>
> #include <decaf/lang/Long.h>
> #include <decaf/lang/System.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Config.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> #include <stdio.h>
> #include <iostream>
> #include <memory>
>
> #include <decaf/util/Random.h>
>
> using namespace activemq::core;
> using namespace decaf::util::concurrent;
> using namespace decaf::util;
> using namespace decaf::lang;
> using namespace cms;
> using namespace std;
>
> #define  QUEUE_NAME    "eventQueue"
> #define NAME_BYTE_LEN        16
>
> class HelloWorldProducer : public ExceptionListener,
>         public MessageListener,
>         public Runnable {
> private:
>         CountDownLatch latch;
>         CountDownLatch doneLatch;
>         Connection* connection;
>         Session* session;
>         Destination* destination;
>         MessageProducer* producer;
>         int numMessages;
>         bool useTopic;
>         bool sessionTransacted;
>         std::string brokerURI;
>         bool bReciveMessage;
>         long waitMillis;
>
> private:
>
>         HelloWorldProducer(const HelloWorldProducer&);
>         HelloWorldProducer& operator=(const HelloWorldProducer&);
>
> public:
>
>         HelloWorldProducer(const std::string& brokerURI, int numMessages,
> bool
> useTopic = false, bool sessionTransacted = false,
>                 long waitMillis = 3000) :
>                 latch(1),
>                 doneLatch(numMessages),
>                 connection(NULL),
>                 session(NULL),
>                 destination(NULL),
>                 producer(NULL),
>                 numMessages(numMessages),
>                 useTopic(useTopic),
>                 sessionTransacted(sessionTransacted),
>                 brokerURI(brokerURI),
>                 bReciveMessage(false),
>                 waitMillis(waitMillis)
>         { }
>
>         virtual ~HelloWorldProducer() {
>                 cleanup();
>         }
>
>         void close() {
>                 this->cleanup();
>         }
>
>         void waitUntilReady() {
>                 latch.await();
>         }
>
>         virtual void run() {
>
>                 try {
>
>                         // Create a ConnectionFactory
>                         auto_ptr<ConnectionFactory> connectionFactory(
>
> ConnectionFactory::createCMSConnectionFactory(brokerURI));
>
>                         // Create a Connection
>                         connection =
> connectionFactory->createConnection();
>                         connection->start();
>
>                         // Create a Session
>                         if (this->sessionTransacted) {
>                                 session =
> connection->createSession(Session::SESSION_TRANSACTED);
>                         }
>                         else {
>                                 session =
> connection->createSession(Session::AUTO_ACKNOWLEDGE);
>                         }
>
>                         session = connection->createSession();
>                         // Create the destination (Topic or Queue)
>                         if (useTopic) {
>                                 destination =
> session->createTopic(QUEUE_NAME);
>                         }
>                         else {
>                                 destination =
> session->createQueue(QUEUE_NAME);
>                         }
>
>                         // Create a MessageProducer from the Session to the
> Topic or Queue
>                         producer = session->createProducer(destination);
>
> producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
>
>                         // Create the Thread Id String
>                         string threadIdStr =
> Long::toString(Thread::currentThread()->getId());
>
>                         // Create a messages
>                         string text = (string) "Hello world! from thread "
> +
> threadIdStr;
>
>                         for (int ix = 0; ix < numMessages; ++ix) {
>                                 std::auto_ptr<TextMessage>
> message(session->createTextMessage(text));
>
>                                 //????...
>                                 std::auto_ptr<Destination>
> tempDest(session->createTemporaryQueue());
>
>                                 //cms::Destination
> tempDest=session->createTemporaryTopic() ;
>                                 MessageConsumer * responseConsumer =
> session->createConsumer(tempDest.get());
>
> responseConsumer->setMessageListener(this);//??...
>
>
>                                 message->setCMSReplyTo(tempDest.get());
>                                 Random random;
>                                 char buffer[NAME_BYTE_LEN] = { 0 };
>                                 random.nextBytes((unsigned char *)buffer,
> NAME_BYTE_LEN);
>                                 string correlationId = "";
>                                 for (int i = 0; i < NAME_BYTE_LEN; ++i)
>                                 {
>                                         char ch[NAME_BYTE_LEN * 2] = { 0
> };
>                                         sprintf(ch, "%02X", (unsigned
> char)buffer[i]);
>                                         string str(ch);
>
>                                         correlationId += str;
>                                 }
>
>
> message->setCMSCorrelationID(correlationId);
>
>                                 message->setIntProperty("Integer", ix);
>                                 printf("Producer Sent message #%d from
> thread %s\n", ix + 1,
> threadIdStr.c_str());
>                                 producer->send(message.get());
>
>                                 // Indicate we are ready for messages.
>                                 latch.countDown();
>
>                                 // Wait while asynchronous messages come
> in.
>                                 doneLatch.await(waitMillis);
>
>                         }
>                 }
>                 catch (CMSException& e) {
>                         printf("Producer run() CMSException \n");
>                         // Indicate we are ready for messages.
>                         latch.countDown();
>                         e.printStackTrace();
>                 }
>
>
>         }
>
>
>         // Called from the Producer since this class is a registered
> MessageListener.
>         virtual void onMessage(const Message* message) {
>
>                 static int count = 0;
>
>                 try {
>                         count++;
>                         const TextMessage* textMessage = dynamic_cast<const
> TextMessage*>
> (message);
>                         //ActiveMQMessageTransformation
>                         //std::auto_ptr<TextMessage>
> responsemessage(session->createTextMessage());
>
> //responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());
>
>                         //responsemessage->getCMSReplyTo()
>
>                         string text = "";
>
>                         if (textMessage != NULL) {
>                                 text = textMessage->getText();
>                         }
>                         else {
>                                 text = "NOT A TEXTMESSAGE!";
>                         }
>
>                         printf("Producer Message #%d Received: %s\n",
> count,
> text.c_str());
>
>
>                         //producer.send
>
>                 }
>                 catch (CMSException& e) {
>                         printf("Producer onMessage() CMSException \n");
>                         e.printStackTrace();
>                 }
>
>                 // Commit all messages.
>                 if (this->sessionTransacted) {
>                         session->commit();
>                 }
>
>                 // No matter what, tag the count down latch until done.
>                 doneLatch.countDown();
>         }
>
>         // If something bad happens you see it here as this class is also
> been
>         // registered as an ExceptionListener with the connection.
>         virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
>                 printf("Producer onException() CMS Exception occurred.
> Shutting down
> client. \n");
>                 ex.printStackTrace();
>                 exit(1);
>         }
>
>
> private:
>
>         void cleanup() {
>
>                 if (connection != NULL) {
>                         try {
>                                 connection->close();
>                         }
>                         catch (cms::CMSException& ex) {
>                                 ex.printStackTrace();
>                         }
>                 }
>
>                 // Destroy resources.
>                 try {
>                         delete destination;
>                         destination = NULL;
>                         delete producer;
>                         producer = NULL;
>                         delete session;
>                         session = NULL;
>                         delete connection;
>                         connection = NULL;
>                 }
>                 catch (CMSException& e) {
>                         e.printStackTrace();
>                 }
>         }
> };
>
> class HelloWorldConsumer : public ExceptionListener,
>         public MessageListener,
>         public Runnable {
>
> private:
>
>         CountDownLatch latch;
>         CountDownLatch doneLatch;
>         Connection* connection;
>         Session* session;
>         Destination* destination;
>         MessageConsumer* consumer;
>         MessageProducer *producer;
>         long waitMillis;
>         bool useTopic;
>         bool sessionTransacted;
>         std::string brokerURI;
>
> private:
>
>         HelloWorldConsumer(const HelloWorldConsumer&);
>         HelloWorldConsumer& operator=(const HelloWorldConsumer&);
>
> public:
>
>         HelloWorldConsumer(const std::string& brokerURI, int numMessages,
> bool
> useTopic = false, bool sessionTransacted = false, int waitMillis = 30000)
> :
>                 latch(1),
>                 doneLatch(numMessages),
>                 connection(NULL),
>                 session(NULL),
>                 destination(NULL),
>                 consumer(NULL),
>                 producer(NULL),
>                 waitMillis(waitMillis),
>                 useTopic(useTopic),
>                 sessionTransacted(sessionTransacted),
>                 brokerURI(brokerURI) {
>         }
>
>         virtual ~HelloWorldConsumer() {
>                 cleanup();
>         }
>
>         void close() {
>                 this->cleanup();
>         }
>
>         void waitUntilReady() {
>                 latch.await();
>         }
>
>         virtual void run() {
>
>                 try {
>
>                         // Create a ConnectionFactory
>                         auto_ptr<ConnectionFactory> connectionFactory(
>
> ConnectionFactory::createCMSConnectionFactory(brokerURI));
>
>                         // Create a Connection
>                         connection =
> connectionFactory->createConnection();
>                         connection->start();
>                         connection->setExceptionListener(this);
>
>                         // Create a Session
>                         if (this->sessionTransacted == true) {
>                                 session =
> connection->createSession(Session::SESSION_TRANSACTED);
>                         }
>                         else {
>                                 session =
> connection->createSession(Session::AUTO_ACKNOWLEDGE);
>                         }
>
>                         // Create the destination (Topic or Queue)
>                         if (useTopic) {
>                                 destination =
> session->createTopic(QUEUE_NAME);
>                         }
>                         else {
>                                 destination =
> session->createQueue(QUEUE_NAME);
>                         }
>
>                         producer = session->createProducer();
>
> producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
>
>                         // Create a MessageConsumer from the Session to the
> Topic or Queue
>                         consumer = session->createConsumer(destination);
>
>                         consumer->setMessageListener(this);
>
>                         std::cout.flush();
>                         std::cerr.flush();
>
>                         // Indicate we are ready for messages.
>                         latch.countDown();
>
>                         // Wait while asynchronous messages come in.
>                         doneLatch.await();
>
>                 }
>                 catch (CMSException& e) {
>                         printf("Consumer onException() CMS Exception
> occurred.  Shutting down
> client. \n");
>                         // Indicate we are ready for messages.
>                         latch.countDown();
>                         e.printStackTrace();
>                 }
>         }
>
>         // Called from the consumer since this class is a registered
> MessageListener.
>         virtual void onMessage(const Message* message) {
>
>                 static int count = 0;
>
>                 try {
>                         count++;
>
>
>                         // Create the Thread Id String
>                         string threadIdStr =
> Long::toString(Thread::currentThread()->getId());
>
>                         static bool bPrintf = true;
>                         if (bPrintf)
>                         {
>                                 bPrintf = false;
>                                 printf("consumer Message threadid: %s\n",
> threadIdStr.c_str());
>                         }
>
>                         string strReply = "consumer return  xxx,ThreadID="
> +
> threadIdStr;
>                         const TextMessage* textMessage = dynamic_cast<const
> TextMessage*>
> (message);
>
>                         if (NULL == textMessage)
>                         {
>                                 printf("NULL==textMessage %s",
> message->getCMSType().c_str());
>
>
>                                 //const cms::MapMessage* mapMsg =
> dynamic_cast<const
> cms::MapMessage*>(message);
>                                 //if(mapMsg)
>                                 //{
>                                 //
>                                 //    std::vector<std::string> elements =
> mapMsg->getMapNames();
>                                 //    std::vector<std::string>::iterator
> iter = elements.begin();
>                                 //    for(; iter != elements.end() ;
> ++iter)
>                                 //    {
>                                 //        std::string key = *iter;
>                                 //        cms::Message::ValueType
> elementType =
> mapMsg->getValueType(key);
>                                 //        string strxxx;
>                                 //        int cc=0;
>                                 //        switch(elementType) {
>                                 //    case cms::Message::BOOLEAN_TYPE:
>                                 //        //msg->setBoolean(key,
> mapMsg->getBoolean(key));
>                                 //        break;
>                                 //    case cms::Message::BYTE_TYPE:
>                                 //        //msg->setByte(key,
> mapMsg->getByte(key));
>                                 //        break;
>                                 //    case cms::Message::BYTE_ARRAY_TYPE:
>                                 //        //msg->setBytes(key,
> mapMsg->getBytes(key));
>                                 //        break;
>                                 //    case cms::Message::CHAR_TYPE:
>                                 //        //msg->setChar(key,
> mapMsg->getChar(key));
>                                 //        break;
>                                 //    case cms::Message::SHORT_TYPE:
>                                 //        //msg->setShort(key,
> mapMsg->getShort(key));
>                                 //        break;
>                                 //    case cms::Message::INTEGER_TYPE:
>                                 //        //msg->setInt(key,
> mapMsg->getInt(key));
>                                 //        break;
>                                 //    case cms::Message::LONG_TYPE:
>                                 //        //msg->setLong(key,
> mapMsg->getLong(key));
>                                 //        break;
>                                 //    case cms::Message::FLOAT_TYPE:
>                                 //        //msg->setFloat(key,
> mapMsg->getFloat(key));
>                                 //        break;
>                                 //    case cms::Message::DOUBLE_TYPE:
>                                 //        //msg->setDouble(key,
> mapMsg->getDouble(key));
>                                 //        break;
>                                 //    case cms::Message::STRING_TYPE:
>                                 //        //msg->setString(key,
> mapMsg->getString(key));
>                                 //        strxxx=mapMsg->getString(key);
>                                 //        cc=1;
>                                 //        break;
>                                 //    default:
>                                 //        break;
>                                 //        }
>                                 //    }
>
>                                 //}
>
>                                 return;
>                         }
>
>                         std::auto_ptr<TextMessage>
> responsemessage(session->createTextMessage(strReply));
>
> responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());
>
>
>                         string text = "";
>
>                         if (textMessage != NULL) {
>                                 text = textMessage->getText();
>                         }
>                         else {
>                                 text = "NOT A TEXTMESSAGE!";
>                         }
>
>                         int nProPerty =
> textMessage->getIntProperty("Integer");
>                         printf("consumer Message #%d Received:
> %s,nProPerty[%d]\n", count,
> text.c_str(), nProPerty);
>
>
>                         const cms::Destination* destSend =
> textMessage->getCMSReplyTo();
>                         if (destSend)
>                         {
>                                 this->producer->send(destSend,
> responsemessage.get());
>
>                                 printf("consumer Message #%d send: %s\n",
> count, strReply.c_str());
>                         }
>
>
>                 }
>                 catch (CMSException& e) {
>                         printf("Consumer onMessage() CMS Exception
> occurred.
> Shutting down
> client. \n");
>                         e.printStackTrace();
>                 }
>
>                 // Commit all messages.
>                 if (this->sessionTransacted) {
>                         session->commit();
>                 }
>
>                 // No matter what, tag the count down latch until done.
>                 //doneLatch.countDown();
>         }
>
>         // If something bad happens you see it here as this class is also
> been
>         // registered as an ExceptionListener with the connection.
>         virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
>                 printf("Consumer onException() CMS Exception occurred.
> Shutting down
> client. \n");
>                 //printf("CMS Exception occurred.  Shutting down
> client.\n");
>                 ex.printStackTrace();
>                 exit(1);
>         }
>
> private:
>
>         void cleanup() {
>                 if (connection != NULL) {
>                         try {
>                                 connection->close();
>                         }
>                         catch (cms::CMSException& ex) {
>                                 ex.printStackTrace();
>                         }
>                 }
>
>                 // Destroy resources.
>                 try {
>                         delete destination;
>                         destination = NULL;
>                         delete consumer;
>                         consumer = NULL;
>                         delete session;
>                         session = NULL;
>                         delete connection;
>                         connection = NULL;
>                 }
>                 catch (CMSException& e) {
>                         e.printStackTrace();
>                 }
>         }
> };
>
> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
>
>         //if(argc<2)
>         //{
>         //    printf("argc<2\r\n");
>         //    return 0;
>         //}
>
>         activemq::library::ActiveMQCPP::initializeLibrary();
>         {
>                 std::cout <<
> "=====================================================\n";
>                 std::cout << "Starting the example:" << std::endl;
>                 std::cout <<
> "-----------------------------------------------------\n";
>
>
>                 // Set the URI to point to the IP Address of your broker.
>                 // add any optional params to the url to enable things
> like
>                 // tightMarshalling or tcp logging etc.  See the CMS web
> site for
>                 // a full list of configuration options.
>                 //
>                 //  http://activemq.apache.org/cms/
>                 //
>                 // Wire Format Options:
>                 // =========================
>                 // Use either stomp or openwire, the default ports are
> different for each
>                 //
>                 // Examples:
>                 //    tcp://127.0.0.1:61616                      default
> to
> openwire
>                 //    tcp://127.0.0.1:61616?wireFormat=openwire  same as
> above
>                 //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp
> instead
>                 //
>                 // SSL:
>                 // =========================
>                 // To use SSL you need to specify the location of the
> trusted Root CA or
> the
>                 // certificate for the broker you want to connect to.
> Using
> the Root CA
> allows
>                 // you to use failover with multiple servers all using
> certificates signed
> by
>                 // the trusted root.  If using client authentication you
> also need to
> specify
>                 // the location of the client Certificate.
>                 //
>                 //     System::setProperty( "decaf.net.ssl.keyStore",
> "<path>/client.pem"
> );
>                 //     System::setProperty(
> "decaf.net.ssl.keyStorePassword", "password"
> );
>                 //     System::setProperty( "decaf.net.ssl.trustStore",
> "<path>/rootCA.pem" );
>                 //
>                 // The you just specify the ssl transport in the URI, for
> example:
>                 //
>                 //     ssl://localhost:61617
>                 //
>                 std::string brokerURI =
> "tcp://127.0.0.1:61616?jms.watchTopicAdvisories=false";
>
>
> //============================================================
>                 // set to true to use topics instead of queues
>                 // Note in the code above that this causes createTopic or
>                 // createQueue to be used in both consumer an producer.
>
> //============================================================
>                 bool useTopics = false;
>                 bool sessionTransacted = true;
>                 int numMessages = 1;
>                 bool useConsumer = true;
>                 bool useProducer = true;
>
>                 //int nSet=atoi(argv[1]);
>                 //if(1==nSet)
>                 //{
>                 //#define USE_COMSUMER
>
>
>                 //}
>                 //else
>                 //{
>                 //#define USE_PRODUCER
>
>                 //
>                 //}
>
>
>
>                 long long startTime = System::currentTimeMillis();
>
> #ifdef USE_PRODUCER
>                 printf("?? USE_PRODUCER \r\n");
>
>                 int numProducerMessages = 30;
>                 int nThreadNumber = 10;
>                 vector<HelloWorldProducer *> vHelloWorldProducer;
>                 for (int i = 0; i < nThreadNumber; ++i)
>                 {
>                         HelloWorldProducer * producerTemp = new
> HelloWorldProducer(brokerURI,
> numProducerMessages, useTopics);
>                         vHelloWorldProducer.push_back(producerTemp);
>                 }
>
> #endif
>
> #ifdef USE_COMSUMER
>                 printf("?? USE_COMSUMER \r\n");
>                 HelloWorldConsumer consumer(brokerURI, numMessages,
> useTopics,
> sessionTransacted);
>                 // Start the consumer thread.
>                 Thread consumerThread(&consumer);
>                 consumerThread.start();
>
>                 // Wait for the consumer to indicate that its ready to go.
>                 consumer.waitUntilReady();
>
> #endif
>
>
>
>
> #ifdef USE_PRODUCER
>                 // Start the producer thread.
>
>                 vector<Thread *> vThread;
>                 for (int i = 0; i < nThreadNumber; ++i)
>                 {
>                         HelloWorldProducer & ProducerTemp =
> *vHelloWorldProducer[i];
>                         Thread * threadTemp = new Thread(&ProducerTemp);
>                         vThread.push_back(threadTemp);
>                         threadTemp->start();
>                         ProducerTemp.waitUntilReady();
>
>                 }
>
>                 for (size_t i = 0; i < vThread.size(); ++i)
>                 {
>                         Thread * threadTemp = vThread[i];
>                         //threadTemp->join();
>                 }
>                 while (1)
>                 {
>                         Thread::sleep(10);
>                 }
>
>                 //Thread producerThread1(&producer1);
>                 //producerThread1.start();
>                 //producer1.waitUntilReady();
>
>                 //Thread producerThread2(&producer2);
>                 //producerThread2.start();
>                 //producer2.waitUntilReady();
>
>                 //Thread producerThread3(&producer3);
>                 //producerThread3.start();
>                 //producer3.waitUntilReady();
> #endif
>
>
>
>
> #ifdef USE_PRODUCER
>                 // Wait for the threads to complete.
>                 //producerThread1.join();
>                 //producerThread2.join();
>                 //producerThread3.join();
> #endif
>
> #ifdef USE_COMSUMER
>                 consumerThread.join();
> #endif
>
>                 long long endTime = System::currentTimeMillis();
>                 double totalTime = (double)(endTime - startTime) / 1000.0;
>
> #ifdef USE_PRODUCER
>                 //producer1.close();
>                 //producer2.close();
>                 //producer3.close();
>
>                 for (size_t i = 0; i < vHelloWorldProducer.size(); ++i)
>                 {
>                         HelloWorldProducer * ProducerTemp =
> vHelloWorldProducer[i];
>                         ProducerTemp->close();
>
>                         if (ProducerTemp)
>                         {
>                                 delete ProducerTemp;
>                                 ProducerTemp = NULL;
>                         }
>                 }
>
> #endif
> #ifdef USE_COMSUMER
>                 consumer.close();
> #endif
>
>
>
>
>                 std::cout << "Time to completion = " << totalTime << "
> seconds." <<
> std::endl;
>                 std::cout <<
> "-----------------------------------------------------\n";
>                 std::cout << "Finished with the example." << std::endl;
>                 std::cout <<
> "=====================================================\n";
>
>         }
>         activemq::library::ActiveMQCPP::shutdownLibrary();
>
>
>         return 0;
> }
>
> When I run the above example producer and consumer following happens:
>
> 1. Producer is able to put the message on the queue.
> 2. Consumer is able to retrieve the message from the queues.
> 3. When consumer tries to send response back using replyTo desitnation,
> the
> send fails with error message listed above.
>
> On the broker and consumer/producer I have advisorySupport turned OFF.
> When
> I turn them on this work fine.
>
> What I would like to know:
> 1. How can I make the error go away but still have advisorySupport off.
>
> Thanks a lot for your help.
>
> P.S-- Attaching my activemq.xml.
>
>
>
> --
> Sent from:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
>
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

dpatel
This is on a single broker. Here is the activemq xml again. activemq.xml
<http://activemq.2283324.n4.nabble.com/file/t375533/activemq.xml>  



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

dpatel
In reply to this post by Tim Bain
Can you help. Not sure how to proceed here as with advisorySupport on my
broker is running out of memory.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

artnaseef
I suspect you may be running into the problem that the client keeps track of
known temporary destinations on the broker, and immediately rejects attempts
to produce to temporary destinations that the client doesn't know about,
even if they exist on the broker (although that's not the intent).

This includes both a race condition and I suspect just fails when advisories
are disabled on the broker; the race condition is due to the following:
The client needs to receive the temp destination creation advisory before
attempting to send a message to that temporary destination; otherwise the
"Cannot publish to a deleted Destination" exception is thrown - by the
client library, not the broker
That advisory is sent out-of-band (i.e. asynchronously)
With that said, the watchTopicAdvisories setting on the client can disable
this feature (which I believe is intended to be an optimization).

tcp://localhost:61616?jms.watchTopicAdvisories=false

See this page for some details:
http://activemq.apache.org/advisory-message.html

Art

P.S. This is my third attempt to reply. Apologies for any duplicates.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

dpatel
I did try to connect with the ?jms.watchTopicAdvisories=false in the uri.
That didn't fix it. I ran my example with activemqcpp code. I found that
that on the consumer side when I call

MessageProducer->send(replyTodestination, message)

ultimately it ends up calling

ActiveMQSessionKernel::send() in which it checks for isDeleted(). IsDeleted
checks the mValue map inside the options object in the producers connection
object. That map is empty which causes the "Cannot publish to deleted
destination message".

I dug a little further. The only way destinations can be added to the map is
by calling addTemporaryDestination() function on the option class. That
function is only called with advisory message or when creating a
temporaryQueue.

As per my understanding TempQueue will never work on the client side without
advisory messages. Am I missing something? If I am wrong please help
understand what I need to do to fix the message.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

tabish121@gmail.com
On 09/18/2018 09:38 PM, dpatel wrote:
> I did try to connect with the ?jms.watchTopicAdvisories=false in the uri.

connection.watchTopicAdvisories=false

--
Tim Bish

Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQCPP--Cannot publish to a deleted destinatination: temp-queue:

dpatel