[HELP] How to make persistent Topic after activemq server restart?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[HELP] How to make persistent Topic after activemq server restart?

liny
This post was updated on .
Hi,

Sorry~I searched forum but still can not make things work.
Now what I can do is when activemq server started, I sent a topic message, new client connected and can receive last one message because "lastImageSubscriptionRecoveryPolicy" and "topic://OTD.METRICS?consumer.retroactive=true".
What I can NOT do now is that topic message is not stored after activemq server restarted.
So no last message is kept, no client can get last message once client connected to activemq server.
What I want is simple, I want topic message can be stored after activemq restarted.
Would you kindly help me or share?
Anything is appreciated!

Below is my borker configuration:
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>      
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="taxsrv01" dataDirectory="${activemq.base}/data" useShutdownHook="true" persistent="true" deleteAllMessagesOnStartup="false" schedulerSupport="false" schedulePeriodForDestinationPurge="60000">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry topic="OTD.METRICS" producerFlowControl="true" memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                  <subscriptionRecoveryPolicy>
                    <lastImageSubscriptionRecoveryPolicy/>
                  </subscriptionRecoveryPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb" gcInactiveDestinations="true" inactiveTimoutBeforeGC="120000">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy> 
 
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterClientsOnRemove="true"/>
        </transportConnectors>

    </broker>

    <import resource="jetty.xml"/>
    
</beans>

Below is my testing application:
	    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://xxx.com:61616");
	    Connection c = factory.createConnection();
	    Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    Destination dest = s.createTopic("OTD.METRICS");
	    MessageProducer producer = s.createProducer(dest);
	    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	    producer.send(s.createTextMessage("" + new Date()));
	    c.close();

Thanks for reading!
Reply | Threaded
Open this post in threaded view
|

Re: [HELP] How to make persistent Topic after activemq server restart?

liny
Hi,

My ActiveMQ server version is 5.5.1.
I was told that I should have durable consumer connect to activemq server first.
So I took a test as below:
1. Create durable consumer X and connect to activemq server. I can see my durable consumer X in "Active Durable Topic Subscribers" from Web Console.
2. I use Web Console to send a persistent message to "OTD.METRICS" topic.
3. I can see durable consumer X got new message.
4. I restart activemq server.
5. Now I create another durable consumer Y, Y is shown in "Active Durable Topic Subscribers" from Web Console.
6. But durable consumer Y didn't get last message.

My question is:
I had set "lastImageSubscriptionRecoveryPolicy" to topic, why no last message is kept?

My codes to create consumer is
			c = ((JmsHelper) getApplication()).getJmsConnection();
			long dttm = System.currentTimeMillis();

			c.setClientID("SiteLevelOtdMetrics-clientID-anderson" );
			c.start();
			Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    Topic dest = s.createTopic("OTD.METRICS?consumer.retroactive=true");
	    TopicSubscriber scriber = s.createDurableSubscriber(dest, "MyDurableScubscriber-scriberName-" + dttm);
	    scriber.setMessageListener(new MessageListener() {
				@Override
				public void onMessage(Message msg)
				{
					handleMessage(msg);
				}
			});

Any hints are appreciated!
Reply | Threaded
Open this post in threaded view
|

Re: [HELP] How to make persistent Topic after activemq server restart?

liny
This post was updated on .
In reply to this post by liny
Hi,

I had read how-do-durable-queues-and-topics-work.
It says
For example imagine a durable subscriber S starts up subscribing to topic T at time D1. 
Some publisher sends messages M1, M2, M3 to the topic and S will receive each of these messages. 
Then S is stopped and the publisher continues to send M4, M5.

When S restarts at D2, the publisher sends M6 and M7. 
Now S will receive M4, M5 followed by M6 and M7 and all future messages. i.e. S will receive all messages from M1..M7.

My request is I don't S to get M4 and M5.
S just need to get M7 - the last message.
New subscriber needs to get M7 when connected to activemq server even after activemq server restarted.
Would anyone can share any idea?
Thank you very much.
Reply | Threaded
Open this post in threaded view
|

Re: [HELP] How to make persistent Topic after activemq server restart?

gtully
this looks like something that is missing, there is no
org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy
that will peek at the topic store, they are all memory based so they
don't survive a restart.

What is needed is a
org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy
that can query the topic store. this needs an enhancement. Can you
raise a jira issue to track this?


On 12 March 2012 06:28, liny <[hidden email]> wrote:

> Hi,
>
> I had read
> http://activemq.apache.org/how-do-durable-queues-and-topics-work.html
> how-do-durable-queues-and-topics-work .
> It says
>
>
> My request is I don't S to get M4 and M5.
> S just need to get M7 - the last message.
> New subscriber needs to get M7 when connected to activemq server even after
> activemq server restarted.
> Would anyone can share any idea?
> Thank you very much.
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/HELP-How-to-make-persistent-Topic-after-activemq-server-restart-tp4459019p4465371.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.



--
http://fusesource.com
http://blog.garytully.com
Reply | Threaded
Open this post in threaded view
|

Re: [HELP] How to make persistent Topic after activemq server restart?

liny
Hi~

I submit my first issue at https://issues.apache.org/jira/browse/AMQ-3766.
Hope we can have such great feature soon~
Thank you Gary and all.