Non static WebClient?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Non static WebClient?

Gregw
Hi,

I was trying to tidy up the Ajax code so that consumers closed when
sessions timeout (or long polls stop coming).   But the queueConsumer
map in WebClient is static and key only by destination, which means:

 + You can only have one consumer per queue.    I can imagine Ajax
   apps that want to hand out messages to one of many clients and thus
   having multiple consumers would be a good way to do this.

 + As it stands, you don't know when the consumer is no longer needed.
   So it will live forever even if all sessions timeout.

I've reworked the code so that queue consumers are associated with
the httpSession (just as topic consumers) and they use the common
jms session. unsubscribe now closes consumers as will long poll
timeout and ending the session.

But I don't want to check it in, as I don't understand why the
consumers were static in the first place.  Some effort was put
into the static code and recovering the map from context
attributes etc.  So I'd like to double check that I'm not
missing something?

I've attached a patch and would appreciate any comments.

cheers





Index: activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (working copy)
@@ -55,6 +55,14 @@
  * specify a readTimeout parameter to determine how long the servlet should
  * block for.
  *
+ * The servlet can be configured with the following init parameters:<dl>
+ * <dt>defaultReadTimeout</dt><dd>The default time in ms to wait for messages. May be overridden by a request using the 'timeout' parameter</dd>
+ * <dt>maximumReadTimeout</dt><dd>The maximum value a request may specify for the 'timeout' parameter</dd>
+ * <dt>maximumMessages</dt><dd>maximum messages to send per response</dd>
+ * <dt></dt><dd></dd>
+ * </dl>
+ *  
+ *
  * @version $Revision: 1.1.1.1 $
  */
 public class MessageListenerServlet extends MessageServletSupport {
@@ -132,6 +140,7 @@
                     {
                         Listener listener = getListener(request);
                         Map consumerIdMap = getConsumerIdMap(request);
+                        client.closeConsumer(destination); // drop any existing consumer.
                         MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
                         
                         consumer.setAvailableListener(listener);
@@ -145,9 +154,9 @@
                         Map consumerIdMap = getConsumerIdMap(request);
                         MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
                         
-                        // TODO should we destroy consumer on unsubscribe?
                         consumer.setAvailableListener(null);
                         consumerIdMap.remove(consumer);
+                        client.closeConsumer(destination);
                         if (log.isDebugEnabled()) {
                             log.debug("Unsubscribed: "+consumer);
                         }
@@ -189,7 +198,6 @@
             response.setHeader("Cache-Control", "no-cache");
             response.getWriter().print("<ajax-response></ajax-response>");
         }
-        // System.err.println("==");
     }
 
     /**
@@ -209,7 +217,6 @@
         catch (JMSException e) {
             throw new ServletException("JMS problem: " + e, e);
         }
-        // System.err.println("--");
     }
 
     /**
@@ -232,11 +239,12 @@
             log.debug("doMessage timeout="+timeout);
         }
         
-        Continuation continuation = null;
-        Message message = null;
-
+        Continuation continuation = ContinuationSupport.getContinuation(request, client);
         Listener listener = getListener(request);
+        if (listener!=null && continuation!=null && !continuation.isPending())
+            listener.access();
 
+        Message message = null;
         synchronized (client) {
 
             List consumers = client.getConsumers();
@@ -259,8 +267,6 @@
             // messages
 
             if (message == null) {
-                continuation = ContinuationSupport.getContinuation(request, client);
-
                 // register this continuation with our listener.
                 listener.setContinuation(continuation);
 
@@ -268,6 +274,7 @@
                 // request here).
                 continuation.suspend(timeout);
             }
+            listener.setContinuation(null);
 
             // prepare the responds
             response.setContentType("text/xml");
@@ -299,7 +306,6 @@
 
                 // Look for any available messages
                 message = consumer.receiveNoWait();
-                // System.err.println("received "+message+" from "+consumer);
                 while (message != null && messages < maximumMessages) {
                     String id = (String) consumerIdMap.get(consumer);
                     writer.print("<response id='");
@@ -314,6 +320,7 @@
 
             // Add poll message
             // writer.println("<response type='object' id='amqPoll'><ok/></response>");
+            
             writer.print("</ajax-response>");
 
             writer.flush();
@@ -386,15 +393,18 @@
      */
     private class Listener implements MessageAvailableListener {
         WebClient client;
-
+        long lastAccess;
         Continuation continuation;
 
-        List queue = new LinkedList();
-
         Listener(WebClient client) {
             this.client = client;
         }
 
+        public void access()
+        {
+            lastAccess=System.currentTimeMillis();
+        }
+        
         public void setContinuation(Continuation continuation) {
             synchronized (client) {
                 this.continuation = continuation;
@@ -408,21 +418,13 @@
                 }
                 if (continuation != null)
                     continuation.resume();
+                else if (System.currentTimeMillis()-lastAccess>2*maximumReadTimeout)
+                {
+                    client.closeConsumers();
+                }
                 continuation = null;
             }
         }
 
     }
-
-    private static void dump(Map map)
-    {
-        Iterator iter=map.entrySet().iterator();
-        while(iter.hasNext())
-        {
-            Map.Entry entry=(Map.Entry)iter.next();
-            String k=(String)entry.getKey();
-            String[] v=(String[])entry.getValue();
-            System.err.println(k+":"+(v==null?"[]":Arrays.asList(v).toString()));
-        }
-    }
 }
Index: activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java (revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java (working copy)
@@ -38,6 +38,11 @@
  * there are various ways to map JMS operations to web requests
  * so we put most of the common behaviour in a reusable base class.
  *
+ * This servlet can be configured with the following init paramters <dl>
+ * <dt>topic</dt><dd>Set to 'true' if the servle should default to using topics rather than channels</dd>
+ * <dt>destination</dt><dd>The default destination to use if one is not specifiied</dd>
+ * <dt></dt><dd></dd>
+ * </dl>
  * @version $Revision: 1.1.1.1 $
  */
 public abstract class MessageServletSupport extends HttpServlet {
@@ -74,7 +79,7 @@
     }
 
     protected WebClient createWebClient(HttpServletRequest request) {
-        return new WebClient(getServletContext());
+        return new WebClient();
     }
 
     public static boolean asBoolean(String param) {
@@ -99,7 +104,7 @@
     protected WebClient getWebClient(HttpServletRequest request) {
         HttpSession session = request.getSession(true);
         WebClient client = WebClient.getWebClient(session);
-        if (client == null) {
+        if (client == null || client.isClosed()) {
             client = createWebClient(request);
             session.setAttribute(WebClient.webClientAttribute, client);
         }
Index: activemq-web/src/main/java/org/apache/activemq/web/ConnectionManager.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/ConnectionManager.java (revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/ConnectionManager.java (working copy)
@@ -1,49 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.web;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.servlet.http.HttpSessionEvent;
-import javax.servlet.http.HttpSessionListener;
-
-/**
- * Listens to sessions closing to ensure that JMS connections are
- * cleaned up nicely
- *
- * @version $Revision: 1.1.1.1 $
- */
-public class ConnectionManager implements HttpSessionListener {
-    private static final Log log = LogFactory.getLog(ConnectionManager.class);
-
-    public void sessionCreated(HttpSessionEvent event) {
-    }
-
-    public void sessionDestroyed(HttpSessionEvent event) {
-        /** TODO we can't use the session any more now!
-         WebClient client = WebClient.getWebClient(event.getSession());
-         try {
-         client.stop();
-         }
-         catch (JMSException e) {
-         log.warn("Error closing connection: " + e, e);
-         }
-         */
-    }
-}
Index: activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/WebClient.java (revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/WebClient.java (working copy)
@@ -35,43 +35,44 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.Topic;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpSession;
 import javax.servlet.http.HttpSessionActivationListener;
+import javax.servlet.http.HttpSessionBindingEvent;
+import javax.servlet.http.HttpSessionBindingListener;
 import javax.servlet.http.HttpSessionEvent;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.MessageAvailableConsumer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
 
 /**
  * Represents a messaging client used from inside a web container
  * typically stored inside a HttpSession
+ *
+ * TODO controls to prevent DOS attacks with users requesting many consumers
  *
  * @version $Revision: 1.1.1.1 $
  */
-public class WebClient implements HttpSessionActivationListener, Externalizable {
+public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
     public static final String webClientAttribute = "org.apache.activemq.webclient";
     public static final String connectionFactoryAttribute = "org.apache.activemq.connectionFactory";
-    public static final String queueConsumersAttribute = "org.apache.activemq.queueConsumers";
     public static final String brokerUrlInitParam = "org.apache.activemq.brokerURL";
 
     private static final Log log = LogFactory.getLog(WebClient.class);
 
     private static transient ConnectionFactory factory;
-    private static transient Map queueConsumers;
+    
+    private transient Map consumers = new HashMap();
 
-    private transient ServletContext context;
     private transient ActiveMQConnection connection;
     private transient ActiveMQSession session;
     private transient MessageProducer producer;
-    private transient Map topicConsumers = new ConcurrentHashMap();
     private int deliveryMode = DeliveryMode.NON_PERSISTENT;
 
     private final Semaphore semaphore = new Semaphore(1);
@@ -86,26 +87,16 @@
 
 
     public static void initContext(ServletContext context) {
-        factory = initConnectionFactory(context);
-        if (factory == null) {
-            log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
-            factory = new ActiveMQConnectionFactory("vm://localhost");
-            context.setAttribute(connectionFactoryAttribute, factory);
-        }
-        queueConsumers = initQueueConsumers(context);
+        initConnectionFactory(context);
     }
 
     /**
-     * Only called by serialization
      */
     public WebClient() {
+        if (factory==null)
+            throw new IllegalStateException("initContext(ServletContext) not called");
     }
 
-    public WebClient(ServletContext context) {
-        this.context = context;
-        initContext(context);
-    }
-
     
     public int getDeliveryMode() {
         return deliveryMode;
@@ -117,28 +108,52 @@
     }
 
 
-    public void start() throws JMSException {
+    public synchronized void closeConsumers()
+    {
+        for (Iterator it = consumers.values().iterator(); it.hasNext();) {
+            MessageConsumer consumer = (MessageConsumer) it.next();
+            it.remove();
+            try{
+                consumer.setMessageListener(null);
+                if (consumer instanceof MessageAvailableConsumer)
+                    ((MessageAvailableConsumer)consumer).setAvailableListener(null);
+                consumer.close();
+            }
+            catch(JMSException e)
+            {
+                e.printStackTrace();
+            }
+        }
     }
 
-    public void stop() throws JMSException {
-        System.out.println("Closing the WebClient!!! " + this);
-        
+    public synchronized void close() {
         try {
-            connection.close();
+            closeConsumers();
+            if (connection!=null)
+                connection.close();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
         }
         finally {
             producer = null;
             session = null;
             connection = null;
-            topicConsumers.clear();
+            if (consumers!=null)
+                consumers.clear();
+            consumers=null;
         }
     }
+    
+    public boolean isClosed()
+    {
+        return consumers==null;
+    }
 
     public void writeExternal(ObjectOutput out) throws IOException {
     }
 
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        topicConsumers = new HashMap();
+        consumers = new HashMap();
     }
 
     public void send(Destination destination, Message message) throws JMSException {
@@ -167,103 +182,75 @@
         return connection;
     }
 
-    public void sessionWillPassivate(HttpSessionEvent event) {
-        try {
-            stop();
-        }
-        catch (JMSException e) {
-            log.warn("Could not close connection: " + e, e);
-        }
-    }
-
-    public void sessionDidActivate(HttpSessionEvent event) {
-        // lets update the connection factory from the servlet context
-        context = event.getSession().getServletContext();
-        initContext(context);
-    }
-
-    public static Map initQueueConsumers(ServletContext context) {
-        Map answer = (Map) context.getAttribute(queueConsumersAttribute);
-        if (answer == null) {
-            answer = new HashMap();
-            context.setAttribute(queueConsumersAttribute, answer);
-        }
-        return answer;
-    }
-
-
-    public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
-        ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
-        if (connectionFactory == null) {
-            String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
-
+    public static synchronized void initConnectionFactory(ServletContext servletContext) {
+        if (factory==null)
+            factory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
+        if (factory == null) {
+            String brokerURL = servletContext.getInitParameter(brokerUrlInitParam);
+            
             servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
-
+            
             if (brokerURL == null) {
                 brokerURL = "vm://localhost";
             }
+            
+            ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
+            factory = amqfactory;
+          
+            servletContext.setAttribute(connectionFactoryAttribute, factory);
+        }
+    }
 
+    public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
+        return getConsumer(destination,true);
+    }
 
-            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
-            connectionFactory = factory;
-            servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
+    public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException {
+        
+        MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
+        if (create && consumer == null) {
+            consumer = getSession().createConsumer(destination);
+            consumers.put(destination, consumer);
         }
-        return connectionFactory;
+        return consumer;
     }
 
-    public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
-        if (destination instanceof Topic) {
-            MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
-            if (consumer == null) {
-                consumer = getSession().createConsumer(destination);
-                topicConsumers.put(destination, consumer);
-            }
-            return consumer;
+    public synchronized void closeConsumer(Destination destination) throws JMSException {
+        MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
+        if (consumer != null) {
+            consumers.remove(destination);
+            consumer.setMessageListener(null);
+            if (consumer instanceof MessageAvailableConsumer)
+                ((MessageAvailableConsumer)consumer).setAvailableListener(null);
+            consumer.close();
         }
-        else {
-            synchronized (queueConsumers) {
-                SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
-                if (pair == null) {
-                    pair = createSessionConsumerPair(destination);
-                    queueConsumers.put(destination, pair);
-                }
-                return pair.consumer;
-            }
-        }
     }
     
     public synchronized List getConsumers()
     {
-        ArrayList list = new ArrayList(topicConsumers.size()+queueConsumers.size());
-        
-        // TODO check this double synchronization on queue but not on topics
-        synchronized (queueConsumers) {
-            for (Iterator it = queueConsumers.values().iterator(); it.hasNext();) {
-                SessionConsumerPair pair = (SessionConsumerPair) it.next();
-                list.add(pair.consumer);
-            }
-        }
-        list.addAll(topicConsumers.values());
-        return list;
+        return new ArrayList(consumers.values());
     }
 
     protected ActiveMQSession createSession() throws JMSException {
         return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
-    protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
-        SessionConsumerPair answer = new SessionConsumerPair();
-        answer.session = createSession();
-        answer.consumer = answer.session.createConsumer(destination);
-        return answer;
+
+    public Semaphore getSemaphore() {
+        return semaphore;
     }
 
-    protected static class SessionConsumerPair {
-        public Session session;
-        public MessageConsumer consumer;
+    public void sessionWillPassivate(HttpSessionEvent event) {
+        close();
     }
 
-    public Semaphore getSemaphore() {
-        return semaphore;
+    public void sessionDidActivate(HttpSessionEvent event) {
     }
+
+    public void valueBound(HttpSessionBindingEvent event) {
+    }
+
+    public void valueUnbound(HttpSessionBindingEvent event) {
+        close();
+    }
 }
Index: activemq-web/src/main/webapp/WEB-INF/web.xml
===================================================================
--- activemq-web/src/main/webapp/WEB-INF/web.xml (revision 388492)
+++ activemq-web/src/main/webapp/WEB-INF/web.xml (working copy)
@@ -38,12 +38,6 @@
         <description>Whether we should include an embedded broker or not</description>
     </context-param>
 
-    <!-- connection manager -->
-    <listener>
-        <listener-class>org.apache.activemq.web.ConnectionManager</listener-class>
-    </listener>
-
-
     <!-- servlet mappings -->
     
     <!-- the subscription REST servlet -->
Reply | Threaded
Open this post in threaded view
|

Re: Non static WebClient?

James Strachan-2
On 3/24/06, Greg Wilkins <[hidden email]> wrote:

> Hi,
>
> I was trying to tidy up the Ajax code so that consumers closed when
> sessions timeout (or long polls stop coming).   But the queueConsumer
> map in WebClient is static and key only by destination, which means:
>
>  + You can only have one consumer per queue.    I can imagine Ajax
>    apps that want to hand out messages to one of many clients and thus
>    having multiple consumers would be a good way to do this.
>
>  + As it stands, you don't know when the consumer is no longer needed.
>    So it will live forever even if all sessions timeout.
>
> I've reworked the code so that queue consumers are associated with
> the httpSession (just as topic consumers) and they use the common
> jms session. unsubscribe now closes consumers as will long poll
> timeout and ending the session.
>
> But I don't want to check it in, as I don't understand why the
> consumers were static in the first place.  Some effort was put
> into the static code and recovering the map from context
> attributes etc.  So I'd like to double check that I'm not
> missing something?
>
> I've attached a patch and would appreciate any comments.

I think I remember now why it was done like that.

Topic consumers don't really interfere with each other, they are
atomic things; if the come and go it doesn't affect anyone else much.

Queue consumers however do compete with each other;  creating a single
consumer will effectively grab a whole bunch of messages ready to be
dispatched to the client when ready (which is generally as soon as is
possible with normal JMS clients).

The worry is messages will just sit there in the consumer, not being
consumed by available consumers. So I think the idea was for web
clients to pull messages out of a single consumer to minimise the
number of messages that get stuck in these inbound message buffers.

However having a consumer per subscription & http session is much
cleaner - so a better way is maybe to tweak ActiveMQ to work nicer in
this slightly different web-subscription model. e.g. we can set the
prefetchSize to 1 or even 0 to minimise the number of messages that
just end up getting stuck in a consumer before they start being
actually consumed.

One change we should add to ActiveMQ to further minimise this problem
is that if a consumer is created - and it then recieves messages and
then does not process them within some time period, the messages are
given back to the server so that they can be dispatched to another
consumer (together with lowering the priority of the consumer). Then
if lots of consumers are created that time out, the messages are taken
back and given to a more active consumer.

--

James
-------
http://radio.weblogs.com/0112098/
Reply | Threaded
Open this post in threaded view
|

Re: Non static WebClient?

Gregw

James,

OK I understand that now.   Shall I'll modify the patch so that it can either
share or create individual consumers for a queue.   Or do you think with
the tweaks non-shared is sufficient?

If we keep shared, I'll add a reference count on the shared consumer so we
know when to clean it up.

For the non-shared consumer, my current patch will close an idle
consumer that is not being long-polled anymore.   Does a close
return any prefetched messages to the queue?

cheers


James Strachan wrote:

> On 3/24/06, Greg Wilkins <[hidden email]> wrote:
>
>>Hi,
>>
>>I was trying to tidy up the Ajax code so that consumers closed when
>>sessions timeout (or long polls stop coming).   But the queueConsumer
>>map in WebClient is static and key only by destination, which means:
>>
>> + You can only have one consumer per queue.    I can imagine Ajax
>>   apps that want to hand out messages to one of many clients and thus
>>   having multiple consumers would be a good way to do this.
>>
>> + As it stands, you don't know when the consumer is no longer needed.
>>   So it will live forever even if all sessions timeout.
>>
>>I've reworked the code so that queue consumers are associated with
>>the httpSession (just as topic consumers) and they use the common
>>jms session. unsubscribe now closes consumers as will long poll
>>timeout and ending the session.
>>
>>But I don't want to check it in, as I don't understand why the
>>consumers were static in the first place.  Some effort was put
>>into the static code and recovering the map from context
>>attributes etc.  So I'd like to double check that I'm not
>>missing something?
>>
>>I've attached a patch and would appreciate any comments.
>
>
> I think I remember now why it was done like that.
>
> Topic consumers don't really interfere with each other, they are
> atomic things; if the come and go it doesn't affect anyone else much.
>
> Queue consumers however do compete with each other;  creating a single
> consumer will effectively grab a whole bunch of messages ready to be
> dispatched to the client when ready (which is generally as soon as is
> possible with normal JMS clients).
>
> The worry is messages will just sit there in the consumer, not being
> consumed by available consumers. So I think the idea was for web
> clients to pull messages out of a single consumer to minimise the
> number of messages that get stuck in these inbound message buffers.
>
> However having a consumer per subscription & http session is much
> cleaner - so a better way is maybe to tweak ActiveMQ to work nicer in
> this slightly different web-subscription model. e.g. we can set the
> prefetchSize to 1 or even 0 to minimise the number of messages that
> just end up getting stuck in a consumer before they start being
> actually consumed.
>
> One change we should add to ActiveMQ to further minimise this problem
> is that if a consumer is created - and it then recieves messages and
> then does not process them within some time period, the messages are
> given back to the server so that they can be dispatched to another
> consumer (together with lowering the priority of the consumer). Then
> if lots of consumers are created that time out, the messages are taken
> back and given to a more active consumer.
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>

Reply | Threaded
Open this post in threaded view
|

Re: Non static WebClient?

James Strachan-2
On 3/25/06, Greg Wilkins <[hidden email]> wrote:

>
> James,
>
> OK I understand that now.   Shall I'll modify the patch so that it can either
> share or create individual consumers for a queue.   Or do you think with
> the tweaks non-shared is sufficient?
>
> If we keep shared, I'll add a reference count on the shared consumer so we
> know when to clean it up.
>
> For the non-shared consumer, my current patch will close an idle
> consumer that is not being long-polled anymore.   Does a close
> return any prefetched messages to the queue?

Yes it does - so that should be good enough.

We can also ensure that the prefetch is set quite small for web based
consumers (say 1 message).

Lets go with the non-shared version as its a bit cleaner.

--

James
-------
http://radio.weblogs.com/0112098/