Cleanup Pending messages using JMX remove message

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Cleanup Pending messages using JMX remove message

Hitesh


Hi all,
I have a very peculiar and weird usecase with ActiveMQ 5.13

I have two durable subscribers listening on a *topic*. When a particular
event occurs, I have to invalidate the messages and remove them from the
topic. To do this, I'm using JMX connection and browsing on the topic and
then from each subscriber I'm removing the message. If the subscriber is
Active, I'm closing the activeMQ connection, which is a
org.apache.activemq.pool.PooledConnectionFactory.createConnection().

In this scenario, I'm able to clear the pending message and also the space
on Kahadb reduces. Expected behaviour.

But If I keep my subscribers inactive, the message count remains the same
and also the kahadb disk usage.

Following is the code I use to cleanup the messages:
    public void startCleanup() throws ExportStagingException {
        try {
            if (topicViewMBean == null) {
                createTopicViewMBean();
            }
            for (String jmsMessageID :
getMessageIDs(topicViewMBean.browse())) {
                messagesCount++;
                for (DurableSubscriptionViewMBean subscriber :
getSubscriptions(topicViewMBean.getSubscriptions(), connection)) {
                    subscriber.removeMessage(jmsMessageID);
                }
            }
            if (topicViewMBean.browse().length > 0) {
                startCleanup();
            }
            if (messagesCount == 0) {
                logger.info("No messages to delete on Topic: " + topicName +
"_" + projectName);
            } else {
                logger.info("Successfully deleted " + messagesCount + "
messages from " + topicName + "_" + projectName);
                messagesCount = 0;
            }
        } catch (Exception e) {
            throw new ExportStagingException(e);
        }
    }

Following function fetches the message from the topic and maintains a list
of their JMSMessageID

    private Set<String> getMessageIDs(CompositeData[] messages) {
        Set<String> jmsMessageIDs = new HashSet<>();
        for (CompositeData message : messages) {
            jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID));
        }
        return jmsMessageIDs;
    }

And this one to fetch the subscription list

    private List<DurableSubscriptionViewMBean> getSubscriptions(ObjectName[]
subscriptionNames, MBeanServerConnection conn) {
        if (subscriptions == null) {
            subscriptions = new ArrayList<>();
            for (ObjectName subscriptionName : subscriptionNames) {
                //Creates Subscriber Object and caches it.
                subscriptions.add(
                        MBeanServerInvocationHandler.newProxyInstance(
                                conn,
                                subscriptionName,
                                DurableSubscriptionViewMBean.class,
                                true
                        )
                );
            }
        }
        return subscriptions;
    }


And last but not the least,  The create connection and topic

    private MBeanServerConnection createConnection() {
        try {
            if (connection == null) {
                logger.info("Connecting to ActiveMQ JMX Portal");
                String jmxURL = "service:jmx:rmi:///jndi/rmi://" + hostName
+ ":" + jmxPort + "/jmxrmi";
                connection = JMXConnectorFactory.connect(new
JMXServiceURL(jmxURL)).getMBeanServerConnection();
                logger.info("Connected to ActiveMQ JMX Portal");
            }
        } catch (IOException e) {
            logger.error("[" + masterProducer + "] Exception while
createConnection ActiveMQ JMX Portal" + e.getMessage());
        }
        return connection;
    }

    private TopicViewMBean createTopicViewMBean() throws
ExportStagingException {
        try {
            if (topicViewMBean == null) {
                if (connection == null) {
                    createConnection();
                }
                String brokerObjectName =
"org.apache.activemq:type=Broker,brokerName=" + brokerName;
                BrokerViewMBean broker;
                broker =
MBeanServerInvocationHandler.newProxyInstance(connection, new
ObjectName(brokerObjectName),
                        BrokerViewMBean.class, true);
                //The following for-loop fetches info about the topics
available on the Broker.
                boolean topicExists = false;
                for (ObjectName topic : broker.getTopics()) {
                    if
(topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" +
projectName)) {
                        topicExists = true;
                        topicViewMBean =
MBeanServerInvocationHandler.newProxyInstance(connection, topic,
TopicViewMBean.class, true);
                        break;
                    }
                }
                if (!topicExists) {
                    logger.info("Topic " + topicName + " does not exists");
                }
            }
        } catch (MalformedObjectNameException e) {
            logger.error("MalformedObjectNameException while
createTopicViewMBean: " + e.getMessage());
            throw new ExportStagingException(e);
        }
        return topicViewMBean;
    }


I've gone through the code to-and fro trying to find the missing link, but
all in vain.
Can anyone help me out over here...

Thanks :)





--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
Reply | Threaded
Open this post in threaded view
|

Re: Cleanup Pending messages using JMX remove message

Tim Bain
Just to confirm: when your subscribers are all offline, how many objects
are returned in the List from getSubscriptions()?

Tim

On Wed, Oct 18, 2017 at 10:18 AM, Hitesh <
[hidden email]> wrote:

>
>
> Hi all,
> I have a very peculiar and weird usecase with ActiveMQ 5.13
>
> I have two durable subscribers listening on a *topic*. When a particular
> event occurs, I have to invalidate the messages and remove them from the
> topic. To do this, I'm using JMX connection and browsing on the topic and
> then from each subscriber I'm removing the message. If the subscriber is
> Active, I'm closing the activeMQ connection, which is a
> org.apache.activemq.pool.PooledConnectionFactory.createConnection().
>
> In this scenario, I'm able to clear the pending message and also the space
> on Kahadb reduces. Expected behaviour.
>
> But If I keep my subscribers inactive, the message count remains the same
> and also the kahadb disk usage.
>
> Following is the code I use to cleanup the messages:
>     public void startCleanup() throws ExportStagingException {
>         try {
>             if (topicViewMBean == null) {
>                 createTopicViewMBean();
>             }
>             for (String jmsMessageID :
> getMessageIDs(topicViewMBean.browse())) {
>                 messagesCount++;
>                 for (DurableSubscriptionViewMBean subscriber :
> getSubscriptions(topicViewMBean.getSubscriptions(), connection)) {
>                     subscriber.removeMessage(jmsMessageID);
>                 }
>             }
>             if (topicViewMBean.browse().length > 0) {
>                 startCleanup();
>             }
>             if (messagesCount == 0) {
>                 logger.info("No messages to delete on Topic: " +
> topicName +
> "_" + projectName);
>             } else {
>                 logger.info("Successfully deleted " + messagesCount + "
> messages from " + topicName + "_" + projectName);
>                 messagesCount = 0;
>             }
>         } catch (Exception e) {
>             throw new ExportStagingException(e);
>         }
>     }
>
> Following function fetches the message from the topic and maintains a list
> of their JMSMessageID
>
>     private Set<String> getMessageIDs(CompositeData[] messages) {
>         Set<String> jmsMessageIDs = new HashSet<>();
>         for (CompositeData message : messages) {
>             jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID));
>         }
>         return jmsMessageIDs;
>     }
>
> And this one to fetch the subscription list
>
>     private List<DurableSubscriptionViewMBean>
> getSubscriptions(ObjectName[]
> subscriptionNames, MBeanServerConnection conn) {
>         if (subscriptions == null) {
>             subscriptions = new ArrayList<>();
>             for (ObjectName subscriptionName : subscriptionNames) {
>                 //Creates Subscriber Object and caches it.
>                 subscriptions.add(
>                         MBeanServerInvocationHandler.newProxyInstance(
>                                 conn,
>                                 subscriptionName,
>                                 DurableSubscriptionViewMBean.class,
>                                 true
>                         )
>                 );
>             }
>         }
>         return subscriptions;
>     }
>
>
> And last but not the least,  The create connection and topic
>
>     private MBeanServerConnection createConnection() {
>         try {
>             if (connection == null) {
>                 logger.info("Connecting to ActiveMQ JMX Portal");
>                 String jmxURL = "service:jmx:rmi:///jndi/rmi://" +
> hostName
> + ":" + jmxPort + "/jmxrmi";
>                 connection = JMXConnectorFactory.connect(new
> JMXServiceURL(jmxURL)).getMBeanServerConnection();
>                 logger.info("Connected to ActiveMQ JMX Portal");
>             }
>         } catch (IOException e) {
>             logger.error("[" + masterProducer + "] Exception while
> createConnection ActiveMQ JMX Portal" + e.getMessage());
>         }
>         return connection;
>     }
>
>     private TopicViewMBean createTopicViewMBean() throws
> ExportStagingException {
>         try {
>             if (topicViewMBean == null) {
>                 if (connection == null) {
>                     createConnection();
>                 }
>                 String brokerObjectName =
> "org.apache.activemq:type=Broker,brokerName=" + brokerName;
>                 BrokerViewMBean broker;
>                 broker =
> MBeanServerInvocationHandler.newProxyInstance(connection, new
> ObjectName(brokerObjectName),
>                         BrokerViewMBean.class, true);
>                 //The following for-loop fetches info about the topics
> available on the Broker.
>                 boolean topicExists = false;
>                 for (ObjectName topic : broker.getTopics()) {
>                     if
> (topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" +
> projectName)) {
>                         topicExists = true;
>                         topicViewMBean =
> MBeanServerInvocationHandler.newProxyInstance(connection, topic,
> TopicViewMBean.class, true);
>                         break;
>                     }
>                 }
>                 if (!topicExists) {
>                     logger.info("Topic " + topicName + " does not
> exists");
>                 }
>             }
>         } catch (MalformedObjectNameException e) {
>             logger.error("MalformedObjectNameException while
> createTopicViewMBean: " + e.getMessage());
>             throw new ExportStagingException(e);
>         }
>         return topicViewMBean;
>     }
>
>
> I've gone through the code to-and fro trying to find the missing link, but
> all in vain.
> Can anyone help me out over here...
>
> Thanks :)
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>
Reply | Threaded
Open this post in threaded view
|

Re: Cleanup Pending messages using JMX remove message

Hitesh
I have 2 durable subscribers. And when offline, the count is still 2.



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html