included: fully functional (almost) quote producer

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

included: fully functional (almost) quote producer

millerkdm
If someone would have posted something like this, I would have saved weeks of work.

1. This is a fully-functional quote publisher  (with  quote objects, db stuff, and a few other things  
removed).
2. It has an embedded broker.
3. When quotes come in, it will publish quotes only if there is a subscriber for a quote.
4. It will remove publishers when there are no longer clients subscribed to a topic.
5. It sets memory and prefetch limits.
6. It evicts messages for slow consumers.
7. It includes a temporary topic request for the most recent quote, and replies to only the single requesting consumer.


Any recommendations on how to improve would be appreciated.

enjoy.



package server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import messages.BarsRequestMessage;
import messages.QuoteRequestMessage;
import messages.TicksRequestMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicPublisher;
import org.apache.activemq.ActiveMQTopicSession;
import org.apache.activemq.ActiveMQTopicSubscriber;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.CacheEntryList;
import org.apache.activemq.memory.CacheEvictionUsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.log4j.Logger;

public class QuoteProducer implements MessageListener
{
        protected static final DateFormat dateTimeFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
        protected static final String tcpURL = "tcp://localhost:3000";
        protected static final String vmURL = "vm://localhost:4000";
        static Logger logger = Logger.getLogger(QuoteProducer.class);
        protected ActiveMQConnection connection = null;
        protected ActiveMQTopicSession session = null;
        protected HashMap<String, ActiveMQTopicPublisher> topicMap;
        protected BrokerService broker;
        protected InetAddress multiCastReceiveAddress = null;
        protected int multiCastReceivePort;
        protected MulticastSocket multiCastReceiveSocket = null;
        protected QuoteListener feedThread;
       
        public QuoteProducer()
        {
                try
                {
                        //PropertyConfigurator.configure("log4j.properties");
                       
                        topicMap = new HashMap<String, ActiveMQTopicPublisher>();
                        broker = createBroker();
                        connection = createConnection();
                        session = createSession(connection);
                        createTopicsListener();
                        createTemporaryTopicListener();
                        feedThread = new QuoteListener("238.0.0.9", 12344);
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }
       
        BrokerService createBroker()
        {
                BrokerService b = null;
                try
                {
                        b = new BrokerService();
                        PolicyEntry policy = new PolicyEntry();
                        ConstantPendingMessageLimitStrategy limitStrategy = new ConstantPendingMessageLimitStrategy();
                        limitStrategy.setLimit(1000);
                        policy.setPendingMessageLimitStrategy(limitStrategy);
                        policy.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
                policy.setSendAdvisoryIfNoConsumers(true);

                        PolicyMap pMap = new PolicyMap();
                        pMap.setDefaultEntry(policy);
                       
                        b .setDestinationPolicy(pMap);
                        b.setPersistent(false);
                        b .setUseJmx(false);
                       
                        UsageManager um = broker.getMemoryManager();
                        um.setLimit(1024 * 1024 * 200);
                        CacheEvictionUsageListener ceul = new CacheEvictionUsageListener(um, 90, 80, broker.getTaskRunnerFactory());
                       
                        CacheEntryList cel = new CacheEntryList();
                        ceul.add(cel.createFIFOCacheEvictor());
                        b.getMemoryManager().addUsageListener(ceul);
                        b.setDeleteAllMessagesOnStartup(true);
                        b.addConnector(tcpURL);
                        b.addConnector(vmURL);
                        b.start();
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
               
                return b;
        }
       
        protected void publishToTopic(String topic, ActiveMQObjectMessage message)
        {
                try
                {
                        ActiveMQTopicPublisher publisher = topicMap.get(topic);
                        if (publisher == null)
                                return;
                        publisher.publish(message);
                }
                catch (JMSException e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }

        protected boolean subscribersForTopic(String topic)
        {
                return (topicMap.containsKey(topic));
        }

        protected void addSubscription(String topic)
        {
                if (topic.startsWith("TEMP") || topic.startsWith("ID")|| subscribersForTopic(topic))
                        return;
               
                synchronized (topicMap)
                {
                        ActiveMQTopicPublisher publisher = null;
                        try
                        {
                                publisher = createPublisher(session, topic);
                                topicMap.put(topic, publisher);
                                System.out.println("added topic: " + topic);
                        }
                        catch (JMSException e)
                        {
                                logger.error(e.getStackTrace());
                        }
                }
        }

        protected void removeSubscription(String topic)
        {
                synchronized (topicMap)
                {
                        if (!subscribersForTopic(topic))
                                return;

                        ActiveMQTopicPublisher publisher = topicMap.remove(topic);
                        if (publisher != null)
                        {
                                try
                                {
                                        publisher.close();
                                        System.out.println("removed topic: " + topic);
                                }
                                catch (JMSException e)
                                {
                                        logger.error(e.getStackTrace());
                                }
                        }
                }
        }
       
        protected ActiveMQConnection createConnection() throws JMSException, Exception
        {
                ActiveMQConnection c = null;
                try
                {
                        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
                        prefetchPolicy.setMaximumPendingMessageLimit(1);
                        prefetchPolicy.setTopicPrefetch(1);
                        prefetchPolicy.setMaximumPendingMessageLimit(10);

                        ActiveMQConnection con = null;
                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("Data Server",
                                        AuthenticatingBroker.serverPassword, vmURL);
                        connectionFactory.setAlwaysSessionAsync(true);
                        connectionFactory.setUseAsyncSend(true);
                        connectionFactory.setOptimizeAcknowledge(true);
                        connectionFactory.setDisableTimeStampsByDefault(true);
                        connectionFactory.setCopyMessageOnSend(false);
                        connectionFactory.setUseCompression(true);
                        connectionFactory.setPrefetchPolicy(prefetchPolicy);
                        con = (ActiveMQConnection) connectionFactory.createConnection();

                        con.start();
                        return con;
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
                return c;
        }
       
        protected ActiveMQTopicSession createSession(ActiveMQConnection connection) throws Exception
        {
                ActiveMQTopicSession ses = (ActiveMQTopicSession) connection.createTopicSession(false,
                                ActiveMQSession.AUTO_ACKNOWLEDGE);
                return ses;
        }
       
        protected ActiveMQTopicPublisher createPublisher(ActiveMQTopicSession session, String msgTopic) throws JMSException
        {
                ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(msgTopic);
                ActiveMQTopicPublisher publisher = (ActiveMQTopicPublisher) session.createPublisher(destination);
                publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                publisher.setDisableMessageID(true);
                publisher.setDisableMessageTimestamp(true);
                return publisher;
        }
       
        protected void createTemporaryTopicListener()
        {
                try
                {
                        String tempTopic = "TEMP MESSAGE";
                        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(tempTopic);

                        ActiveMQTopicSubscriber tempSubscriber = (ActiveMQTopicSubscriber) session
                                .createSubscriber(topic);
                        tempSubscriber.setMessageListener(this);
                       
                }
                catch (JMSException e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }
        protected void createTopicsListener()
        {
                try
                {
                        String msgTopic = ">"; // listen for advisories on all topics
                        ActiveMQTopic allTopics = (ActiveMQTopic) session.createTopic(msgTopic);
                       
                        ActiveMQTopic noConsumerTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber noConsumerSubscriber = (ActiveMQTopicSubscriber) session
                                        .createSubscriber(noConsumerTopic);
                        noConsumerSubscriber.setMessageListener(this);

                        ActiveMQTopic consumerTopic = AdvisorySupport.getConsumerAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber consumerSubscriber = (ActiveMQTopicSubscriber) session
                                        .createSubscriber(consumerTopic);
                        consumerSubscriber.setMessageListener(this);

                        /*
                        ActiveMQTopic producerTopic = AdvisorySupport.getProducerAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber producerSubscriber = (ActiveMQTopicSubscriber) session
                                        .createSubscriber(producerTopic);
                        producerSubscriber.setMessageListener(this);

                       
                        ActiveMQTopic connectionTopic = AdvisorySupport.getConnectionAdvisoryTopic();
                        ActiveMQTopicSubscriber connectionSubscriber = (ActiveMQTopicSubscriber) session
                                        .createSubscriber(connectionTopic);
                        connectionSubscriber.setMessageListener(this);
                        ActiveMQTopic destinationTopic = AdvisorySupport.getDestinationAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber destinationSubscriber = (ActiveMQTopicSubscriber) session
                                        .createSubscriber(destinationTopic);
                        destinationSubscriber.setMessageListener(this);
                        */
                }
                catch (JMSException e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }
       
        public static boolean isNoConsumerAdvisoryTopic(ActiveMQDestination destination)
        {
                if (destination.isComposite())
                {
                        ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
                        for (int i = 0; i < compositeDestinations.length; i++)
                        {
                                if (isNoConsumerAdvisoryTopic(compositeDestinations[i]))
                                {
                                        return true;
                                }
                        }
                        return false;
                }
                else
                {
                        return destination.isTopic()
                                        && destination.getPhysicalName().startsWith(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX);
                }
        }
       
        public static ActiveMQTopic getNoConsumerAdvisoryTopic(ActiveMQDestination destination)
        {
                return new ActiveMQTopic(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName());
        }
       
        public static String getNoConsumerAdvisoryTopic(String topic)
        {
                String ret = topic.substring(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX.length());
                return ret;
        }
        public static String getConsumerAdvisoryTopic(String topic)
        {
                String ret = topic.substring(AdvisorySupport.TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX.length());
                return ret;
        }
       
        public void onMessage(Message message)
        {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                ActiveMQTopic destination = (ActiveMQTopic) activeMessage.getDestination();
                System.out.println(destination);
               
                if (AdvisorySupport.isAdvisoryTopic(destination))
                {
                        if (AdvisorySupport.isConsumerAdvisoryTopic(destination))
                        {
                                Object command = activeMessage.getDataStructure();
                                if (command != null)
                                {
                                        if (command instanceof ConsumerInfo)
                                        {
                                                String topic = getConsumerAdvisoryTopic(destination.getPhysicalName());
                                                addSubscription(topic);
                                        }
                                }
                        }
                        else if (isNoConsumerAdvisoryTopic(destination))
                        {
                                String topic = getNoConsumerAdvisoryTopic(destination.getPhysicalName());
                                removeSubscription(topic);
                        }
                }
                else if (activeMessage instanceof ActiveMQObjectMessage)
                { //for out temorary request topics
                        try
                        {
                                if (requestMessage.getObject() instanceof QuoteRequestMessage)
                                {
                                        new QuoteMessageRequester(requestMessage);
                                }
                        }
                        catch (JMSException e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                                return;
                        }
                }
        }
       
        class QuoteListener extends Thread
        {
                InetAddress multiCastReceiveAddress;
                int multiCastReceivePort;
                MulticastSocket multiCastReceiveSocket;
                DatagramPacket packet;

                public QuoteListener(String receiveAddress, int receivePort)
                {
                        try
                        {
                                this.multiCastReceiveAddress = InetAddress.getByName(receiveAddress);
                        }
                        catch (UnknownHostException e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                        }
                        this.multiCastReceivePort = receivePort;
                        try
                        {
                                multiCastReceiveSocket = new MulticastSocket(multiCastReceivePort);
                                multiCastReceiveSocket.setTimeToLive(5);
                                multiCastReceiveSocket.joinGroup(multiCastReceiveAddress);
                        }
                        catch (Exception e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                        }
                        byte[] message = new byte[1000];
                        packet = new DatagramPacket(message, message.length);
                        start();
                }
               
                public void run()
                {
                        try
                        {
                                while (true)
                                {
                                        multiCastReceiveSocket.receive(packet);
                                        //long start = System.currentTimeMillis();
                                        processPacket();
                                        //long end = System.currentTimeMillis() - start;
                                        //System.out.println("End of wait for " + end + " millis");
                                }
                        }
                        catch (Exception e)
                        {
                                e.printStackTrace();
                        }
                }
               
                public void processPacket()
                {
                        try
                        {
                                String topic = "TEST.QUOTE";
                                if (!subscribersForTopic(topic))
                                        return;

                                String pretendObject= new String(topic + " " + "stuff");
                                ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage)
                                                session.createObjectMessage();
                                objectMessage.setObject(pretendObject);
                                if (topic.length() > 0 && objectMessage != null)
                                                        publishToTopic(topic, objectMessage);
                        }
                        catch (Exception e)
                        {
                                System.out.println(e.getStackTrace());
                        }
                }
        }

        class QuoteMessageRequester extends Thread
        {
                ActiveMQObjectMessage requestMessage;
                QuoteMessageRequester(ActiveMQObjectMessage requestMessage)
                {
                        this.requestMessage = requestMessage;
                        start();
                }
                public void run()
                {
                        try
                        {
                                String pretendObject= new String("Quote  " + "stuff");
                                ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage)
                                                session.createObjectMessage();
                                objectMessage.setObject(pretendObject);
                               
                                sendReply(requestMessage, pretendObject);
                        }
                        catch (Exception e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                                return;
                        }
                }
        }


        public synchronized void sendReply(Message in, Serializable out)
        {
                try
                {
                        ActiveMQConnection con = createConnection();
                        ActiveMQTopicSession ses = createSession(con);
                        ActiveMQObjectMessage replyMessage = (ActiveMQObjectMessage) ses.createObjectMessage(out);
                        replyMessage.setJMSCorrelationID(in.getJMSMessageID());
                        Destination replyDestination = in.getJMSReplyTo();
                        ActiveMQTopicPublisher pub = (ActiveMQTopicPublisher) ses.createPublisher((Topic) replyDestination);
                        pub.publish(replyMessage);
                        pub.close();
                        ses.close();
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                        return;
                }
        }
        public static void main(String[] args)
        {
                BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                String str;
                System.out.println("Enter 'quit' to exit\n");
                do
                {
                        str = "";
                        try
                        {
                                str = br.readLine().toUpperCase();
                        }
                        catch (IOException e)
                        {
                                e.printStackTrace();
                        }
                }
                while (!str.equals("quit"));
                System.exit(0);
        }
}
Reply | Threaded
Open this post in threaded view
|

Hang during consume

Adrian Tarau-2
Several times, I got a problem with the ActiveMQ(4.0.1), consuming a
message hanged and after I restarted the application everything was fine.
The stack trace is bellow, the waiting is in ActiveMQMessageConsumer.close.

Did anybody got a similar problem? Thanks.



at java.lang.Object.wait(Native Method)
    - waiting on <0xe9d93cc8> (a
edu.emory.mathcs.backport.java.util.concurrent.locks.CondVar)
    at java.lang.Object.wait(Object.java:474)
    at
edu.emory.mathcs.backport.java.util.concurrent.locks.CondVar.await(CondVar.java:75)
    - locked <0xe9d93cc8> (a
edu.emory.mathcs.backport.java.util.concurrent.locks.CondVar)
    at
edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:318)
    at
org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:38)
    at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:74)
    at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1112)
    at
org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667)
    at
org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:516)
Reply | Threaded
Open this post in threaded view
|

Re: included: fully functional (almost) quote producer

James Strachan-2
In reply to this post by millerkdm
Looks cool! :)

One way you could improve it is just using a single MessageProducer to
send messages; if you create a JMS MessageProducer passing in null for
the destination, you can then specify the destination each time you
send a message. Then you can just keep around a Map of Destinations
indexed by String - rather than having a full JMS MessageProducer for
each destination.

Another thought is - you might want to enable last image subscription
recovery; so whenever someone subscribes to a quote, they immediately
get the last image sent to them before any more updates are received.

http://incubator.apache.org/activemq/subscription-recovery-policy.html

I wonder if something vaguely like a quote server would be a useful
utility add-on to distribute with ActiveMQ? I can imagine quite a few
folks wishing to bridge some kind of quotes/market data to an ActiveMQ
network


On 10/20/06, millerkdm <[hidden email]> wrote:

>
> If someone would have posted something like this, I would have saved weeks of
> work.
>
> 1.      This is a fully-functional quote publisher  (with  quote objects, db
> stuff, and a few other things
> removed).
> 2.      It has an embedded broker.
> 3.      When quotes come in, it will publish quotes only if there is a subscriber
> for a quote.
> 4.      It will remove publishers when there are no longer clients subscribed to
> a topic.
> 5.      It sets memory and prefetch limits.
> 6.      It evicts messages for slow consumers.
> 7.      It includes a temporary topic request for the most recent quote, and
> replies to only the single requesting consumer.
>
>
> Any recommendations on how to improve would be appreciated.
>
> enjoy.
>
>
>
> package server;
>
> import java.io.BufferedReader;
> import java.io.IOException;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.DatagramPacket;
> import java.net.InetAddress;
> import java.net.MulticastSocket;
> import java.net.UnknownHostException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.HashMap;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Topic;
> import messages.BarsRequestMessage;
> import messages.QuoteRequestMessage;
> import messages.TicksRequestMessage;
> import org.apache.activemq.ActiveMQConnection;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQPrefetchPolicy;
> import org.apache.activemq.ActiveMQSession;
> import org.apache.activemq.ActiveMQTopicPublisher;
> import org.apache.activemq.ActiveMQTopicSession;
> import org.apache.activemq.ActiveMQTopicSubscriber;
> import org.apache.activemq.advisory.AdvisorySupport;
> import org.apache.activemq.broker.BrokerService;
> import
> org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
> import
> org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
> import org.apache.activemq.broker.region.policy.PolicyEntry;
> import org.apache.activemq.broker.region.policy.PolicyMap;
> import org.apache.activemq.command.ActiveMQDestination;
> import org.apache.activemq.command.ActiveMQMessage;
> import org.apache.activemq.command.ActiveMQObjectMessage;
> import org.apache.activemq.command.ActiveMQTopic;
> import org.apache.activemq.command.ConsumerInfo;
> import org.apache.activemq.memory.CacheEntryList;
> import org.apache.activemq.memory.CacheEvictionUsageListener;
> import org.apache.activemq.memory.UsageManager;
> import org.apache.log4j.Logger;
>
> public class QuoteProducer implements MessageListener
> {
>         protected static final DateFormat dateTimeFormat = new
> SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
>         protected static final String tcpURL = "tcp://localhost:3000";
>         protected static final String vmURL = "vm://localhost:4000";
>         static Logger logger = Logger.getLogger(QuoteProducer.class);
>         protected ActiveMQConnection connection = null;
>         protected ActiveMQTopicSession session = null;
>         protected HashMap<String, ActiveMQTopicPublisher> topicMap;
>         protected BrokerService broker;
>         protected InetAddress multiCastReceiveAddress = null;
>         protected int multiCastReceivePort;
>         protected MulticastSocket multiCastReceiveSocket = null;
>         protected QuoteListener feedThread;
>
>         public QuoteProducer()
>         {
>                 try
>                 {
>                         //PropertyConfigurator.configure("log4j.properties");
>
>                         topicMap = new HashMap<String, ActiveMQTopicPublisher>();
>                         broker = createBroker();
>                         connection = createConnection();
>                         session = createSession(connection);
>                         createTopicsListener();
>                         createTemporaryTopicListener();
>                         feedThread = new QuoteListener("238.0.0.9", 12344);
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>
>         BrokerService createBroker()
>         {
>                 BrokerService b = null;
>                 try
>                 {
>                         b = new BrokerService();
>                         PolicyEntry policy = new PolicyEntry();
>                         ConstantPendingMessageLimitStrategy limitStrategy = new
> ConstantPendingMessageLimitStrategy();
>                         limitStrategy.setLimit(1000);
>                         policy.setPendingMessageLimitStrategy(limitStrategy);
>                         policy.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
>                 policy.setSendAdvisoryIfNoConsumers(true);
>
>                         PolicyMap pMap = new PolicyMap();
>                         pMap.setDefaultEntry(policy);
>
>                         b .setDestinationPolicy(pMap);
>                         b.setPersistent(false);
>                         b .setUseJmx(false);
>
>                         UsageManager um = broker.getMemoryManager();
>                         um.setLimit(1024 * 1024 * 200);
>                         CacheEvictionUsageListener ceul = new CacheEvictionUsageListener(um, 90,
> 80, broker.getTaskRunnerFactory());
>
>                         CacheEntryList cel = new CacheEntryList();
>                         ceul.add(cel.createFIFOCacheEvictor());
>                         b.getMemoryManager().addUsageListener(ceul);
>                         b.setDeleteAllMessagesOnStartup(true);
>                         b.addConnector(tcpURL);
>                         b.addConnector(vmURL);
>                         b.start();
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>
>                 return b;
>         }
>
>         protected void publishToTopic(String topic, ActiveMQObjectMessage message)
>         {
>                 try
>                 {
>                         ActiveMQTopicPublisher publisher = topicMap.get(topic);
>                         if (publisher == null)
>                                 return;
>                         publisher.publish(message);
>                 }
>                 catch (JMSException e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>
>         protected boolean subscribersForTopic(String topic)
>         {
>                 return (topicMap.containsKey(topic));
>         }
>
>         protected void addSubscription(String topic)
>         {
>                 if (topic.startsWith("TEMP") || topic.startsWith("ID")||
> subscribersForTopic(topic))
>                         return;
>
>                 synchronized (topicMap)
>                 {
>                         ActiveMQTopicPublisher publisher = null;
>                         try
>                         {
>                                 publisher = createPublisher(session, topic);
>                                 topicMap.put(topic, publisher);
>                                 System.out.println("added topic: " + topic);
>                         }
>                         catch (JMSException e)
>                         {
>                                 logger.error(e.getStackTrace());
>                         }
>                 }
>         }
>
>         protected void removeSubscription(String topic)
>         {
>                 synchronized (topicMap)
>                 {
>                         if (!subscribersForTopic(topic))
>                                 return;
>
>                         ActiveMQTopicPublisher publisher = topicMap.remove(topic);
>                         if (publisher != null)
>                         {
>                                 try
>                                 {
>                                         publisher.close();
>                                         System.out.println("removed topic: " + topic);
>                                 }
>                                 catch (JMSException e)
>                                 {
>                                         logger.error(e.getStackTrace());
>                                 }
>                         }
>                 }
>         }
>
>         protected ActiveMQConnection createConnection() throws JMSException,
> Exception
>         {
>                 ActiveMQConnection c = null;
>                 try
>                 {
>                         ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
>                         prefetchPolicy.setMaximumPendingMessageLimit(1);
>                         prefetchPolicy.setTopicPrefetch(1);
>                         prefetchPolicy.setMaximumPendingMessageLimit(10);
>
>                         ActiveMQConnection con = null;
>                         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("Data Server",
>                                         AuthenticatingBroker.serverPassword, vmURL);
>                         connectionFactory.setAlwaysSessionAsync(true);
>                         connectionFactory.setUseAsyncSend(true);
>                         connectionFactory.setOptimizeAcknowledge(true);
>                         connectionFactory.setDisableTimeStampsByDefault(true);
>                         connectionFactory.setCopyMessageOnSend(false);
>                         connectionFactory.setUseCompression(true);
>                         connectionFactory.setPrefetchPolicy(prefetchPolicy);
>                         con = (ActiveMQConnection) connectionFactory.createConnection();
>
>                         con.start();
>                         return con;
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>                 return c;
>         }
>
>         protected ActiveMQTopicSession createSession(ActiveMQConnection connection)
> throws Exception
>         {
>                 ActiveMQTopicSession ses = (ActiveMQTopicSession)
> connection.createTopicSession(false,
>                                 ActiveMQSession.AUTO_ACKNOWLEDGE);
>                 return ses;
>         }
>
>         protected ActiveMQTopicPublisher createPublisher(ActiveMQTopicSession
> session, String msgTopic) throws JMSException
>         {
>                 ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(msgTopic);
>                 ActiveMQTopicPublisher publisher = (ActiveMQTopicPublisher)
> session.createPublisher(destination);
>                 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>                 publisher.setDisableMessageID(true);
>                 publisher.setDisableMessageTimestamp(true);
>                 return publisher;
>         }
>
>         protected void createTemporaryTopicListener()
>         {
>                 try
>                 {
>                         String tempTopic = "TEMP MESSAGE";
>                         ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(tempTopic);
>
>                         ActiveMQTopicSubscriber tempSubscriber = (ActiveMQTopicSubscriber)
> session
>                                 .createSubscriber(topic);
>                         tempSubscriber.setMessageListener(this);
>
>                 }
>                 catch (JMSException e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>         protected void createTopicsListener()
>         {
>                 try
>                 {
>                         String msgTopic = ">"; // listen for advisories on all topics
>                         ActiveMQTopic allTopics = (ActiveMQTopic) session.createTopic(msgTopic);
>
>                         ActiveMQTopic noConsumerTopic =
> AdvisorySupport.getNoTopicConsumersAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber noConsumerSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(noConsumerTopic);
>                         noConsumerSubscriber.setMessageListener(this);
>
>                         ActiveMQTopic consumerTopic =
> AdvisorySupport.getConsumerAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber consumerSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(consumerTopic);
>                         consumerSubscriber.setMessageListener(this);
>
>                         /*
>                         ActiveMQTopic producerTopic =
> AdvisorySupport.getProducerAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber producerSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(producerTopic);
>                         producerSubscriber.setMessageListener(this);
>
>
>                         ActiveMQTopic connectionTopic =
> AdvisorySupport.getConnectionAdvisoryTopic();
>                         ActiveMQTopicSubscriber connectionSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(connectionTopic);
>                         connectionSubscriber.setMessageListener(this);
>                         ActiveMQTopic destinationTopic =
> AdvisorySupport.getDestinationAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber destinationSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(destinationTopic);
>                         destinationSubscriber.setMessageListener(this);
>                         */
>                 }
>                 catch (JMSException e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>
>         public static boolean isNoConsumerAdvisoryTopic(ActiveMQDestination
> destination)
>         {
>                 if (destination.isComposite())
>                 {
>                         ActiveMQDestination[] compositeDestinations =
> destination.getCompositeDestinations();
>                         for (int i = 0; i < compositeDestinations.length; i++)
>                         {
>                                 if (isNoConsumerAdvisoryTopic(compositeDestinations[i]))
>                                 {
>                                         return true;
>                                 }
>                         }
>                         return false;
>                 }
>                 else
>                 {
>                         return destination.isTopic()
>                                         &&
> destination.getPhysicalName().startsWith(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX);
>                 }
>         }
>
>         public static ActiveMQTopic getNoConsumerAdvisoryTopic(ActiveMQDestination
> destination)
>         {
>                 return new ActiveMQTopic(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX +
> destination.getPhysicalName());
>         }
>
>         public static String getNoConsumerAdvisoryTopic(String topic)
>         {
>                 String ret =
> topic.substring(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX.length());
>                 return ret;
>         }
>         public static String getConsumerAdvisoryTopic(String topic)
>         {
>                 String ret =
> topic.substring(AdvisorySupport.TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX.length());
>                 return ret;
>         }
>
>         public void onMessage(Message message)
>         {
>                 ActiveMQMessage activeMessage = (ActiveMQMessage) message;
>                 ActiveMQTopic destination = (ActiveMQTopic)
> activeMessage.getDestination();
>                 System.out.println(destination);
>
>                 if (AdvisorySupport.isAdvisoryTopic(destination))
>                 {
>                         if (AdvisorySupport.isConsumerAdvisoryTopic(destination))
>                         {
>                                 Object command = activeMessage.getDataStructure();
>                                 if (command != null)
>                                 {
>                                         if (command instanceof ConsumerInfo)
>                                         {
>                                                 String topic =
> getConsumerAdvisoryTopic(destination.getPhysicalName());
>                                                 addSubscription(topic);
>                                         }
>                                 }
>                         }
>                         else if (isNoConsumerAdvisoryTopic(destination))
>                         {
>                                 String topic =
> getNoConsumerAdvisoryTopic(destination.getPhysicalName());
>                                 removeSubscription(topic);
>                         }
>                 }
>                 else if (activeMessage instanceof ActiveMQObjectMessage)
>                 { //for out temorary request topics
>                         try
>                         {
>                                 if (requestMessage.getObject() instanceof QuoteRequestMessage)
>                                 {
>                                         new QuoteMessageRequester(requestMessage);
>                                 }
>                         }
>                         catch (JMSException e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                                 return;
>                         }
>                 }
>         }
>
>         class QuoteListener extends Thread
>         {
>                 InetAddress multiCastReceiveAddress;
>                 int multiCastReceivePort;
>                 MulticastSocket multiCastReceiveSocket;
>                 DatagramPacket packet;
>
>                 public QuoteListener(String receiveAddress, int receivePort)
>                 {
>                         try
>                         {
>                                 this.multiCastReceiveAddress = InetAddress.getByName(receiveAddress);
>                         }
>                         catch (UnknownHostException e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                         }
>                         this.multiCastReceivePort = receivePort;
>                         try
>                         {
>                                 multiCastReceiveSocket = new MulticastSocket(multiCastReceivePort);
>                                 multiCastReceiveSocket.setTimeToLive(5);
>                                 multiCastReceiveSocket.joinGroup(multiCastReceiveAddress);
>                         }
>                         catch (Exception e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                         }
>                         byte[] message = new byte[1000];
>                         packet = new DatagramPacket(message, message.length);
>                         start();
>                 }
>
>                 public void run()
>                 {
>                         try
>                         {
>                                 while (true)
>                                 {
>                                         multiCastReceiveSocket.receive(packet);
>                                         //long start = System.currentTimeMillis();
>                                         processPacket();
>                                         //long end = System.currentTimeMillis() - start;
>                                         //System.out.println("End of wait for " + end + " millis");
>                                 }
>                         }
>                         catch (Exception e)
>                         {
>                                 e.printStackTrace();
>                         }
>                 }
>
>                 public void processPacket()
>                 {
>                         try
>                         {
>                                 String topic = "TEST.QUOTE";
>                                 if (!subscribersForTopic(topic))
>                                         return;
>
>                                 String pretendObject= new String(topic + " " + "stuff");
>                                 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage)
>                                                 session.createObjectMessage();
>                                 objectMessage.setObject(pretendObject);
>                                 if (topic.length() > 0 && objectMessage != null)
>                                                         publishToTopic(topic, objectMessage);
>                         }
>                         catch (Exception e)
>                         {
>                                 System.out.println(e.getStackTrace());
>                         }
>                 }
>         }
>
>         class QuoteMessageRequester extends Thread
>         {
>                 ActiveMQObjectMessage requestMessage;
>                 QuoteMessageRequester(ActiveMQObjectMessage requestMessage)
>                 {
>                         this.requestMessage = requestMessage;
>                         start();
>                 }
>                 public void run()
>                 {
>                         try
>                         {
>                                 String pretendObject= new String("Quote  " + "stuff");
>                                 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage)
>                                                 session.createObjectMessage();
>                                 objectMessage.setObject(pretendObject);
>
>                                 sendReply(requestMessage, pretendObject);
>                         }
>                         catch (Exception e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                                 return;
>                         }
>                 }
>         }
>
>
>         public synchronized void sendReply(Message in, Serializable out)
>         {
>                 try
>                 {
>                         ActiveMQConnection con = createConnection();
>                         ActiveMQTopicSession ses = createSession(con);
>                         ActiveMQObjectMessage replyMessage = (ActiveMQObjectMessage)
> ses.createObjectMessage(out);
>                         replyMessage.setJMSCorrelationID(in.getJMSMessageID());
>                         Destination replyDestination = in.getJMSReplyTo();
>                         ActiveMQTopicPublisher pub = (ActiveMQTopicPublisher)
> ses.createPublisher((Topic) replyDestination);
>                         pub.publish(replyMessage);
>                         pub.close();
>                         ses.close();
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                         return;
>                 }
>         }
>         public static void main(String[] args)
>         {
>                 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
>                 String str;
>                 System.out.println("Enter 'quit' to exit\n");
>                 do
>                 {
>                         str = "";
>                         try
>                         {
>                                 str = br.readLine().toUpperCase();
>                         }
>                         catch (IOException e)
>                         {
>                                 e.printStackTrace();
>                         }
>                 }
>                 while (!str.equals("quit"));
>                 System.exit(0);
>         }
> }
>
> --
> View this message in context: http://www.nabble.com/included%3A-fully-functional-%28almost%29-quote-producer-tf2482597.html#a6922824
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


--

James
-------
http://radio.weblogs.com/0112098/
Reply | Threaded
Open this post in threaded view
|

Re: included: fully functional (almost) quote producer

millerkdm
Thanks James! I did not know about the single-producer thing. I will definitely give it a try.

I have thought about the last-message thing before, and I don't think it will work, as there are a couple hundred thousand symbols, most of which will never have a subscriber. So, when a user does subscribe to a new symbol, there may be no last message to send. And as memory consumption is definitely an issue with ActiveMQ, I do not want to keep all those quotes around in memory, anyway.  Also, since some quotes will not update for hours, or even days, I need to make sure the user always gets the last quote, and  my best solution, so far, is to just have the client request the last quote when he subscribes to a symbol. Not elegant, but simple.


James.Strachan wrote
Looks cool! :)

One way you could improve it is just using a single MessageProducer to
send messages; if you create a JMS MessageProducer passing in null for
the destination, you can then specify the destination each time you
send a message. Then you can just keep around a Map of Destinations
indexed by String - rather than having a full JMS MessageProducer for
each destination.

Another thought is - you might want to enable last image subscription
recovery; so whenever someone subscribes to a quote, they immediately
get the last image sent to them before any more updates are received.

http://incubator.apache.org/activemq/subscription-recovery-policy.html

I wonder if something vaguely like a quote server would be a useful
utility add-on to distribute with ActiveMQ? I can imagine quite a few
folks wishing to bridge some kind of quotes/market data to an ActiveMQ
network


On 10/20/06, millerkdm <millerkdm@yahoo.com> wrote:
>
> If someone would have posted something like this, I would have saved weeks of
> work.
>
> 1.      This is a fully-functional quote publisher  (with  quote objects, db
> stuff, and a few other things
> removed).
> 2.      It has an embedded broker.
> 3.      When quotes come in, it will publish quotes only if there is a subscriber
> for a quote.
> 4.      It will remove publishers when there are no longer clients subscribed to
> a topic.
> 5.      It sets memory and prefetch limits.
> 6.      It evicts messages for slow consumers.
> 7.      It includes a temporary topic request for the most recent quote, and
> replies to only the single requesting consumer.
>
>
> Any recommendations on how to improve would be appreciated.
>
> enjoy.
>
>
>
> package server;
>
> import java.io.BufferedReader;
> import java.io.IOException;
> import java.io.InputStreamReader;
> import java.io.Serializable;
> import java.net.DatagramPacket;
> import java.net.InetAddress;
> import java.net.MulticastSocket;
> import java.net.UnknownHostException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.HashMap;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Topic;
> import messages.BarsRequestMessage;
> import messages.QuoteRequestMessage;
> import messages.TicksRequestMessage;
> import org.apache.activemq.ActiveMQConnection;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQPrefetchPolicy;
> import org.apache.activemq.ActiveMQSession;
> import org.apache.activemq.ActiveMQTopicPublisher;
> import org.apache.activemq.ActiveMQTopicSession;
> import org.apache.activemq.ActiveMQTopicSubscriber;
> import org.apache.activemq.advisory.AdvisorySupport;
> import org.apache.activemq.broker.BrokerService;
> import
> org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
> import
> org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
> import org.apache.activemq.broker.region.policy.PolicyEntry;
> import org.apache.activemq.broker.region.policy.PolicyMap;
> import org.apache.activemq.command.ActiveMQDestination;
> import org.apache.activemq.command.ActiveMQMessage;
> import org.apache.activemq.command.ActiveMQObjectMessage;
> import org.apache.activemq.command.ActiveMQTopic;
> import org.apache.activemq.command.ConsumerInfo;
> import org.apache.activemq.memory.CacheEntryList;
> import org.apache.activemq.memory.CacheEvictionUsageListener;
> import org.apache.activemq.memory.UsageManager;
> import org.apache.log4j.Logger;
>
> public class QuoteProducer implements MessageListener
> {
>         protected static final DateFormat dateTimeFormat = new
> SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
>         protected static final String tcpURL = "tcp://localhost:3000";
>         protected static final String vmURL = "vm://localhost:4000";
>         static Logger logger = Logger.getLogger(QuoteProducer.class);
>         protected ActiveMQConnection connection = null;
>         protected ActiveMQTopicSession session = null;
>         protected HashMap<String, ActiveMQTopicPublisher> topicMap;
>         protected BrokerService broker;
>         protected InetAddress multiCastReceiveAddress = null;
>         protected int multiCastReceivePort;
>         protected MulticastSocket multiCastReceiveSocket = null;
>         protected QuoteListener feedThread;
>
>         public QuoteProducer()
>         {
>                 try
>                 {
>                         //PropertyConfigurator.configure("log4j.properties");
>
>                         topicMap = new HashMap<String, ActiveMQTopicPublisher>();
>                         broker = createBroker();
>                         connection = createConnection();
>                         session = createSession(connection);
>                         createTopicsListener();
>                         createTemporaryTopicListener();
>                         feedThread = new QuoteListener("238.0.0.9", 12344);
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>
>         BrokerService createBroker()
>         {
>                 BrokerService b = null;
>                 try
>                 {
>                         b = new BrokerService();
>                         PolicyEntry policy = new PolicyEntry();
>                         ConstantPendingMessageLimitStrategy limitStrategy = new
> ConstantPendingMessageLimitStrategy();
>                         limitStrategy.setLimit(1000);
>                         policy.setPendingMessageLimitStrategy(limitStrategy);
>                         policy.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
>                 policy.setSendAdvisoryIfNoConsumers(true);
>
>                         PolicyMap pMap = new PolicyMap();
>                         pMap.setDefaultEntry(policy);
>
>                         b .setDestinationPolicy(pMap);
>                         b.setPersistent(false);
>                         b .setUseJmx(false);
>
>                         UsageManager um = broker.getMemoryManager();
>                         um.setLimit(1024 * 1024 * 200);
>                         CacheEvictionUsageListener ceul = new CacheEvictionUsageListener(um, 90,
> 80, broker.getTaskRunnerFactory());
>
>                         CacheEntryList cel = new CacheEntryList();
>                         ceul.add(cel.createFIFOCacheEvictor());
>                         b.getMemoryManager().addUsageListener(ceul);
>                         b.setDeleteAllMessagesOnStartup(true);
>                         b.addConnector(tcpURL);
>                         b.addConnector(vmURL);
>                         b.start();
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>
>                 return b;
>         }
>
>         protected void publishToTopic(String topic, ActiveMQObjectMessage message)
>         {
>                 try
>                 {
>                         ActiveMQTopicPublisher publisher = topicMap.get(topic);
>                         if (publisher == null)
>                                 return;
>                         publisher.publish(message);
>                 }
>                 catch (JMSException e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>
>         protected boolean subscribersForTopic(String topic)
>         {
>                 return (topicMap.containsKey(topic));
>         }
>
>         protected void addSubscription(String topic)
>         {
>                 if (topic.startsWith("TEMP") || topic.startsWith("ID")||
> subscribersForTopic(topic))
>                         return;
>
>                 synchronized (topicMap)
>                 {
>                         ActiveMQTopicPublisher publisher = null;
>                         try
>                         {
>                                 publisher = createPublisher(session, topic);
>                                 topicMap.put(topic, publisher);
>                                 System.out.println("added topic: " + topic);
>                         }
>                         catch (JMSException e)
>                         {
>                                 logger.error(e.getStackTrace());
>                         }
>                 }
>         }
>
>         protected void removeSubscription(String topic)
>         {
>                 synchronized (topicMap)
>                 {
>                         if (!subscribersForTopic(topic))
>                                 return;
>
>                         ActiveMQTopicPublisher publisher = topicMap.remove(topic);
>                         if (publisher != null)
>                         {
>                                 try
>                                 {
>                                         publisher.close();
>                                         System.out.println("removed topic: " + topic);
>                                 }
>                                 catch (JMSException e)
>                                 {
>                                         logger.error(e.getStackTrace());
>                                 }
>                         }
>                 }
>         }
>
>         protected ActiveMQConnection createConnection() throws JMSException,
> Exception
>         {
>                 ActiveMQConnection c = null;
>                 try
>                 {
>                         ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
>                         prefetchPolicy.setMaximumPendingMessageLimit(1);
>                         prefetchPolicy.setTopicPrefetch(1);
>                         prefetchPolicy.setMaximumPendingMessageLimit(10);
>
>                         ActiveMQConnection con = null;
>                         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("Data Server",
>                                         AuthenticatingBroker.serverPassword, vmURL);
>                         connectionFactory.setAlwaysSessionAsync(true);
>                         connectionFactory.setUseAsyncSend(true);
>                         connectionFactory.setOptimizeAcknowledge(true);
>                         connectionFactory.setDisableTimeStampsByDefault(true);
>                         connectionFactory.setCopyMessageOnSend(false);
>                         connectionFactory.setUseCompression(true);
>                         connectionFactory.setPrefetchPolicy(prefetchPolicy);
>                         con = (ActiveMQConnection) connectionFactory.createConnection();
>
>                         con.start();
>                         return con;
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>                 return c;
>         }
>
>         protected ActiveMQTopicSession createSession(ActiveMQConnection connection)
> throws Exception
>         {
>                 ActiveMQTopicSession ses = (ActiveMQTopicSession)
> connection.createTopicSession(false,
>                                 ActiveMQSession.AUTO_ACKNOWLEDGE);
>                 return ses;
>         }
>
>         protected ActiveMQTopicPublisher createPublisher(ActiveMQTopicSession
> session, String msgTopic) throws JMSException
>         {
>                 ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(msgTopic);
>                 ActiveMQTopicPublisher publisher = (ActiveMQTopicPublisher)
> session.createPublisher(destination);
>                 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>                 publisher.setDisableMessageID(true);
>                 publisher.setDisableMessageTimestamp(true);
>                 return publisher;
>         }
>
>         protected void createTemporaryTopicListener()
>         {
>                 try
>                 {
>                         String tempTopic = "TEMP MESSAGE";
>                         ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(tempTopic);
>
>                         ActiveMQTopicSubscriber tempSubscriber = (ActiveMQTopicSubscriber)
> session
>                                 .createSubscriber(topic);
>                         tempSubscriber.setMessageListener(this);
>
>                 }
>                 catch (JMSException e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>         protected void createTopicsListener()
>         {
>                 try
>                 {
>                         String msgTopic = ">"; // listen for advisories on all topics
>                         ActiveMQTopic allTopics = (ActiveMQTopic) session.createTopic(msgTopic);
>
>                         ActiveMQTopic noConsumerTopic =
> AdvisorySupport.getNoTopicConsumersAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber noConsumerSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(noConsumerTopic);
>                         noConsumerSubscriber.setMessageListener(this);
>
>                         ActiveMQTopic consumerTopic =
> AdvisorySupport.getConsumerAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber consumerSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(consumerTopic);
>                         consumerSubscriber.setMessageListener(this);
>
>                         /*
>                         ActiveMQTopic producerTopic =
> AdvisorySupport.getProducerAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber producerSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(producerTopic);
>                         producerSubscriber.setMessageListener(this);
>
>
>                         ActiveMQTopic connectionTopic =
> AdvisorySupport.getConnectionAdvisoryTopic();
>                         ActiveMQTopicSubscriber connectionSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(connectionTopic);
>                         connectionSubscriber.setMessageListener(this);
>                         ActiveMQTopic destinationTopic =
> AdvisorySupport.getDestinationAdvisoryTopic(allTopics);
>                         ActiveMQTopicSubscriber destinationSubscriber = (ActiveMQTopicSubscriber)
> session
>                                         .createSubscriber(destinationTopic);
>                         destinationSubscriber.setMessageListener(this);
>                         */
>                 }
>                 catch (JMSException e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                 }
>         }
>
>         public static boolean isNoConsumerAdvisoryTopic(ActiveMQDestination
> destination)
>         {
>                 if (destination.isComposite())
>                 {
>                         ActiveMQDestination[] compositeDestinations =
> destination.getCompositeDestinations();
>                         for (int i = 0; i < compositeDestinations.length; i++)
>                         {
>                                 if (isNoConsumerAdvisoryTopic(compositeDestinations[i]))
>                                 {
>                                         return true;
>                                 }
>                         }
>                         return false;
>                 }
>                 else
>                 {
>                         return destination.isTopic()
>                                         &&
> destination.getPhysicalName().startsWith(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX);
>                 }
>         }
>
>         public static ActiveMQTopic getNoConsumerAdvisoryTopic(ActiveMQDestination
> destination)
>         {
>                 return new ActiveMQTopic(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX +
> destination.getPhysicalName());
>         }
>
>         public static String getNoConsumerAdvisoryTopic(String topic)
>         {
>                 String ret =
> topic.substring(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX.length());
>                 return ret;
>         }
>         public static String getConsumerAdvisoryTopic(String topic)
>         {
>                 String ret =
> topic.substring(AdvisorySupport.TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX.length());
>                 return ret;
>         }
>
>         public void onMessage(Message message)
>         {
>                 ActiveMQMessage activeMessage = (ActiveMQMessage) message;
>                 ActiveMQTopic destination = (ActiveMQTopic)
> activeMessage.getDestination();
>                 System.out.println(destination);
>
>                 if (AdvisorySupport.isAdvisoryTopic(destination))
>                 {
>                         if (AdvisorySupport.isConsumerAdvisoryTopic(destination))
>                         {
>                                 Object command = activeMessage.getDataStructure();
>                                 if (command != null)
>                                 {
>                                         if (command instanceof ConsumerInfo)
>                                         {
>                                                 String topic =
> getConsumerAdvisoryTopic(destination.getPhysicalName());
>                                                 addSubscription(topic);
>                                         }
>                                 }
>                         }
>                         else if (isNoConsumerAdvisoryTopic(destination))
>                         {
>                                 String topic =
> getNoConsumerAdvisoryTopic(destination.getPhysicalName());
>                                 removeSubscription(topic);
>                         }
>                 }
>                 else if (activeMessage instanceof ActiveMQObjectMessage)
>                 { //for out temorary request topics
>                         try
>                         {
>                                 if (requestMessage.getObject() instanceof QuoteRequestMessage)
>                                 {
>                                         new QuoteMessageRequester(requestMessage);
>                                 }
>                         }
>                         catch (JMSException e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                                 return;
>                         }
>                 }
>         }
>
>         class QuoteListener extends Thread
>         {
>                 InetAddress multiCastReceiveAddress;
>                 int multiCastReceivePort;
>                 MulticastSocket multiCastReceiveSocket;
>                 DatagramPacket packet;
>
>                 public QuoteListener(String receiveAddress, int receivePort)
>                 {
>                         try
>                         {
>                                 this.multiCastReceiveAddress = InetAddress.getByName(receiveAddress);
>                         }
>                         catch (UnknownHostException e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                         }
>                         this.multiCastReceivePort = receivePort;
>                         try
>                         {
>                                 multiCastReceiveSocket = new MulticastSocket(multiCastReceivePort);
>                                 multiCastReceiveSocket.setTimeToLive(5);
>                                 multiCastReceiveSocket.joinGroup(multiCastReceiveAddress);
>                         }
>                         catch (Exception e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                         }
>                         byte[] message = new byte[1000];
>                         packet = new DatagramPacket(message, message.length);
>                         start();
>                 }
>
>                 public void run()
>                 {
>                         try
>                         {
>                                 while (true)
>                                 {
>                                         multiCastReceiveSocket.receive(packet);
>                                         //long start = System.currentTimeMillis();
>                                         processPacket();
>                                         //long end = System.currentTimeMillis() - start;
>                                         //System.out.println("End of wait for " + end + " millis");
>                                 }
>                         }
>                         catch (Exception e)
>                         {
>                                 e.printStackTrace();
>                         }
>                 }
>
>                 public void processPacket()
>                 {
>                         try
>                         {
>                                 String topic = "TEST.QUOTE";
>                                 if (!subscribersForTopic(topic))
>                                         return;
>
>                                 String pretendObject= new String(topic + " " + "stuff");
>                                 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage)
>                                                 session.createObjectMessage();
>                                 objectMessage.setObject(pretendObject);
>                                 if (topic.length() > 0 && objectMessage != null)
>                                                         publishToTopic(topic, objectMessage);
>                         }
>                         catch (Exception e)
>                         {
>                                 System.out.println(e.getStackTrace());
>                         }
>                 }
>         }
>
>         class QuoteMessageRequester extends Thread
>         {
>                 ActiveMQObjectMessage requestMessage;
>                 QuoteMessageRequester(ActiveMQObjectMessage requestMessage)
>                 {
>                         this.requestMessage = requestMessage;
>                         start();
>                 }
>                 public void run()
>                 {
>                         try
>                         {
>                                 String pretendObject= new String("Quote  " + "stuff");
>                                 ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage)
>                                                 session.createObjectMessage();
>                                 objectMessage.setObject(pretendObject);
>
>                                 sendReply(requestMessage, pretendObject);
>                         }
>                         catch (Exception e)
>                         {
>                                 logger.error(e);
>                                 e.printStackTrace();
>                                 return;
>                         }
>                 }
>         }
>
>
>         public synchronized void sendReply(Message in, Serializable out)
>         {
>                 try
>                 {
>                         ActiveMQConnection con = createConnection();
>                         ActiveMQTopicSession ses = createSession(con);
>                         ActiveMQObjectMessage replyMessage = (ActiveMQObjectMessage)
> ses.createObjectMessage(out);
>                         replyMessage.setJMSCorrelationID(in.getJMSMessageID());
>                         Destination replyDestination = in.getJMSReplyTo();
>                         ActiveMQTopicPublisher pub = (ActiveMQTopicPublisher)
> ses.createPublisher((Topic) replyDestination);
>                         pub.publish(replyMessage);
>                         pub.close();
>                         ses.close();
>                 }
>                 catch (Exception e)
>                 {
>                         logger.error(e);
>                         e.printStackTrace();
>                         return;
>                 }
>         }
>         public static void main(String[] args)
>         {
>                 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
>                 String str;
>                 System.out.println("Enter 'quit' to exit\n");
>                 do
>                 {
>                         str = "";
>                         try
>                         {
>                                 str = br.readLine().toUpperCase();
>                         }
>                         catch (IOException e)
>                         {
>                                 e.printStackTrace();
>                         }
>                 }
>                 while (!str.equals("quit"));
>                 System.exit(0);
>         }
> }
>
> --
> View this message in context: http://www.nabble.com/included%3A-fully-functional-%28almost%29-quote-producer-tf2482597.html#a6922824
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


--

James
-------
http://radio.weblogs.com/0112098/
Reply | Threaded
Open this post in threaded view
|

Re: included: fully functional (almost) quote producer

James Strachan-2
On 10/24/06, millerkdm <[hidden email]> wrote:
>
> Thanks James! I did not know about the single-producer thing. I will
> definitely give it a try.
>
> I have thought about the last-message thing before, and I don't think it
> will work, as there are a couple hundred thousand symbols, most of which
> will never have a subscriber.

You can just a simple LRU cache of a fixed size with TimeToLive to
keep on top of RAM usage etc


> So, when a user does subscribe to a new
> symbol, there may be no last message to send.

OK


>  And as memory consumption is
> definitely an issue with ActiveMQ, I do not want to keep all those quotes
> around in memory, anyway.  Also, since some quotes will not update for
> hours, or even days, I need to make sure the user always gets the last
> quote, and  my best solution, so far, is to just have the client request the
> last quote when he subscribes to a symbol. Not elegant, but simple.

FWIW you could integrate in the 'request the last quote' service into
the broker so its hidden from the client if you wish.

So you just need to implement this interface
http://incubator.apache.org/activemq/maven/activemq-core/apidocs/org/apache/activemq/broker/region/policy/MessageQuery.html

and use the QueryBasedSubscriptionRecoveryPolicy
http://incubator.apache.org/activemq/maven/activemq-core/apidocs/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.html

--

James
-------
http://radio.weblogs.com/0112098/