selectors don't browse?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

selectors don't browse?

Jon Gorrono
I am trying to do some testing an limit the output from a queue that has
about 4500 messages in it. Each message contains between 300 and 500
documents attached that i process individually. But I am trying to test
only specific message types and not see the others so I restart the client
each cycle with different selector header values. I only have the one
client (stomp) running at any time (besides the admin web app)... all
connections are with ack: client-individual

The problem I see is that the selector works fine if the 'top' of the queue
contains messages that match the selector, but I see nothing if the
selector matches messages deep in the queue. The client just waits forever
... it seems to me that the server is waiting for acks but since the
selector hides the initial messages from the client, there's nothing for it
to ack.

There are no producers active... I restore the kahadb each time I need to
replenish the queue

I have DEBUG log4j settings for activemq packages but the logs only log
checkpoints and expiry scans


I would expect that the selector would cause the server to browser down the
queue for message it can dequeue. But does not appear to be happening


broker config:
 <broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.data}">


        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true">

                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">

                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>



        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>



          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="64 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>


        <transportConnectors>

        <transportConnector name="openwire"
uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="amqp"
uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://host:61613"/>
        </transportConnectors>


        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans"
class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

--
Jon Gorrono
PGP Key: 0x5434509D - http{
pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
http{middleware.ucdavis.edu}
Reply | Threaded
Open this post in threaded view
|

Re: selectors don't browse?

Tim Bain
I think the behavior you're seeing has to do with message cursors (
http://activemq.apache.org/message-cursors.html) than with missing acks.
Basically once the cursor is full of messages that don't match your
selectors, no new messages will come out of the message store into the
cursor until one of the messages already in the cursor is consumed, and you
don't have any consumers that are going to do that.

You could look for a way to increase the cursor size (I'm not sure if
that's possible; that web page doesn't describe how to do it and I've never
tried), you could set message expiration dates on your messages so that the
ones early in the queue eventually clear out (but then you have to set that
each time you restart your test, and you have to set them to different
values so that they roll off over time rather than all at once), or you
could just add another consumer that slowly consumes all messages - no
selector - to help clear the cursor (which is probably the easiest option
of the three)...  Just put a small sleep in that consumer's message handler
and you should eventually work through the problem.  Or maybe someone else
has an even better solution that I haven't thought of.

Tim

On Thu, Apr 16, 2015 at 4:39 PM, Jon Gorrono <[hidden email]> wrote:

> I am trying to do some testing an limit the output from a queue that has
> about 4500 messages in it. Each message contains between 300 and 500
> documents attached that i process individually. But I am trying to test
> only specific message types and not see the others so I restart the client
> each cycle with different selector header values. I only have the one
> client (stomp) running at any time (besides the admin web app)... all
> connections are with ack: client-individual
>
> The problem I see is that the selector works fine if the 'top' of the queue
> contains messages that match the selector, but I see nothing if the
> selector matches messages deep in the queue. The client just waits forever
> ... it seems to me that the server is waiting for acks but since the
> selector hides the initial messages from the client, there's nothing for it
> to ack.
>
> There are no producers active... I restore the kahadb each time I need to
> replenish the queue
>
> I have DEBUG log4j settings for activemq packages but the logs only log
> checkpoints and expiry scans
>
>
> I would expect that the selector would cause the server to browser down the
> queue for message it can dequeue. But does not appear to be happening
>
>
> broker config:
>  <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.data}">
>
>
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" producerFlowControl="true">
>
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="1mb">
>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>
>
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>
>
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>         </persistenceAdapter>
>
>
>
>           <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="64 mb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="100 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="50 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>
>
>         <transportConnectors>
>
>         <transportConnector name="openwire"
>
> uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>         <transportConnector name="amqp"
>
> uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>         <transportConnector name="stomp" uri="stomp://host:61613"/>
>         </transportConnectors>
>
>
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>
>     </broker>
>
> --
> Jon Gorrono
> PGP Key: 0x5434509D - http{
> pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> http{middleware.ucdavis.edu}
>
Reply | Threaded
Open this post in threaded view
|

Re: selectors don't browse?

ceposta
In reply to this post by Jon Gorrono
Can you give this a try on a 5.12 SNAPSHOT? we fixed some issues with the
cursor lists for queues... but there is still some slight issues in how the
queue pages in messages when it's dispatch list is "full"... you can force
it to page in more with a "browse".. but there's probably a better way to
fix this. Though the fixes we did not long ago that are in 5.12-SNAPSHOT
should help.

On Thu, Apr 16, 2015 at 3:39 PM, Jon Gorrono <[hidden email]> wrote:

> I am trying to do some testing an limit the output from a queue that has
> about 4500 messages in it. Each message contains between 300 and 500
> documents attached that i process individually. But I am trying to test
> only specific message types and not see the others so I restart the client
> each cycle with different selector header values. I only have the one
> client (stomp) running at any time (besides the admin web app)... all
> connections are with ack: client-individual
>
> The problem I see is that the selector works fine if the 'top' of the queue
> contains messages that match the selector, but I see nothing if the
> selector matches messages deep in the queue. The client just waits forever
> ... it seems to me that the server is waiting for acks but since the
> selector hides the initial messages from the client, there's nothing for it
> to ack.
>
> There are no producers active... I restore the kahadb each time I need to
> replenish the queue
>
> I have DEBUG log4j settings for activemq packages but the logs only log
> checkpoints and expiry scans
>
>
> I would expect that the selector would cause the server to browser down the
> queue for message it can dequeue. But does not appear to be happening
>
>
> broker config:
>  <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="localhost" dataDirectory="${activemq.data}">
>
>
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" producerFlowControl="true">
>
>                   <pendingMessageLimitStrategy>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>                   </pendingMessageLimitStrategy>
>                 </policyEntry>
>                 <policyEntry queue=">" producerFlowControl="true"
> memoryLimit="1mb">
>
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>
>
>
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>
>
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>         </persistenceAdapter>
>
>
>
>           <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="64 mb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="100 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="50 gb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
>
>
>         <transportConnectors>
>
>         <transportConnector name="openwire"
>
> uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>         <transportConnector name="amqp"
>
> uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>         <transportConnector name="stomp" uri="stomp://host:61613"/>
>         </transportConnectors>
>
>
>         <shutdownHooks>
>             <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
>         </shutdownHooks>
>
>     </broker>
>
> --
> Jon Gorrono
> PGP Key: 0x5434509D - http{
> pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> http{middleware.ucdavis.edu}
>



--
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io
Reply | Threaded
Open this post in threaded view
|

Re: selectors don't browse?

Jon Gorrono
In reply to this post by Tim Bain
Thanks for the response..

Yeah I could figure out a way to drain the queue, but it seemed to me like
something the server should be handling for the general case: I've
subscribed with a selector, why is the cursor for this connection filled
with anything that is not selected? I realize that may/must be an
architectural limitation eg, the cursors, as currently defined, are
'closer' to the queue than the selection algorithms... I could accomplish
this with a browsing connection and delayed/ out-of-band storage of
ack-worthy messages... but, as I said, seems central to the function of a
JMS server.

On Thu, Apr 16, 2015 at 8:43 PM, Tim Bain <[hidden email]> wrote:

> I think the behavior you're seeing has to do with message cursors (
> http://activemq.apache.org/message-cursors.html) than with missing acks.
> Basically once the cursor is full of messages that don't match your
> selectors, no new messages will come out of the message store into the
> cursor until one of the messages already in the cursor is consumed, and you
> don't have any consumers that are going to do that.
>
> You could look for a way to increase the cursor size (I'm not sure if
> that's possible; that web page doesn't describe how to do it and I've never
> tried), you could set message expiration dates on your messages so that the
> ones early in the queue eventually clear out (but then you have to set that
> each time you restart your test, and you have to set them to different
> values so that they roll off over time rather than all at once), or you
> could just add another consumer that slowly consumes all messages - no
> selector - to help clear the cursor (which is probably the easiest option
> of the three)...  Just put a small sleep in that consumer's message handler
> and you should eventually work through the problem.  Or maybe someone else
> has an even better solution that I haven't thought of.
>
> Tim
>
> On Thu, Apr 16, 2015 at 4:39 PM, Jon Gorrono <[hidden email]>
> wrote:
>
> > I am trying to do some testing an limit the output from a queue that has
> > about 4500 messages in it. Each message contains between 300 and 500
> > documents attached that i process individually. But I am trying to test
> > only specific message types and not see the others so I restart the
> client
> > each cycle with different selector header values. I only have the one
> > client (stomp) running at any time (besides the admin web app)... all
> > connections are with ack: client-individual
> >
> > The problem I see is that the selector works fine if the 'top' of the
> queue
> > contains messages that match the selector, but I see nothing if the
> > selector matches messages deep in the queue. The client just waits
> forever
> > ... it seems to me that the server is waiting for acks but since the
> > selector hides the initial messages from the client, there's nothing for
> it
> > to ack.
> >
> > There are no producers active... I restore the kahadb each time I need to
> > replenish the queue
> >
> > I have DEBUG log4j settings for activemq packages but the logs only log
> > checkpoints and expiry scans
> >
> >
> > I would expect that the selector would cause the server to browser down
> the
> > queue for message it can dequeue. But does not appear to be happening
> >
> >
> > broker config:
> >  <broker xmlns="http://activemq.apache.org/schema/core"
> > brokerName="localhost" dataDirectory="${activemq.data}">
> >
> >
> >         <destinationPolicy>
> >             <policyMap>
> >               <policyEntries>
> >                 <policyEntry topic=">" producerFlowControl="true">
> >
> >                   <pendingMessageLimitStrategy>
> >                     <constantPendingMessageLimitStrategy limit="1000"/>
> >                   </pendingMessageLimitStrategy>
> >                 </policyEntry>
> >                 <policyEntry queue=">" producerFlowControl="true"
> > memoryLimit="1mb">
> >
> >                 </policyEntry>
> >               </policyEntries>
> >             </policyMap>
> >         </destinationPolicy>
> >
> >
> >
> >         <managementContext>
> >             <managementContext createConnector="false"/>
> >         </managementContext>
> >
> >
> >         <persistenceAdapter>
> >             <kahaDB directory="${activemq.data}/kahadb"/>
> >         </persistenceAdapter>
> >
> >
> >
> >           <systemUsage>
> >             <systemUsage>
> >                 <memoryUsage>
> >                     <memoryUsage limit="64 mb"/>
> >                 </memoryUsage>
> >                 <storeUsage>
> >                     <storeUsage limit="100 gb"/>
> >                 </storeUsage>
> >                 <tempUsage>
> >                     <tempUsage limit="50 gb"/>
> >                 </tempUsage>
> >             </systemUsage>
> >         </systemUsage>
> >
> >
> >         <transportConnectors>
> >
> >         <transportConnector name="openwire"
> >
> >
> uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> >         <transportConnector name="amqp"
> >
> >
> uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> >         <transportConnector name="stomp" uri="stomp://host:61613"/>
> >         </transportConnectors>
> >
> >
> >         <shutdownHooks>
> >             <bean xmlns="http://www.springframework.org/schema/beans"
> > class="org.apache.activemq.hooks.SpringContextHook" />
> >         </shutdownHooks>
> >
> >     </broker>
> >
> > --
> > Jon Gorrono
> > PGP Key: 0x5434509D - http{
> > pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> > http{middleware.ucdavis.edu}
> >
>



--
Jon Gorrono
PGP Key: 0x5434509D - http{
pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
http{middleware.ucdavis.edu}
Reply | Threaded
Open this post in threaded view
|

Re: selectors don't browse?

Jon Gorrono
In reply to this post by ceposta
Yeah, sure... I'll get the snapshot and try to give a go tomorrow.

I may need to ask more about how you force the browse....

Thanks

On Tue, Apr 21, 2015 at 5:05 PM, Christian Posta <[hidden email]>
wrote:

> Can you give this a try on a 5.12 SNAPSHOT? we fixed some issues with the
> cursor lists for queues... but there is still some slight issues in how the
> queue pages in messages when it's dispatch list is "full"... you can force
> it to page in more with a "browse".. but there's probably a better way to
> fix this. Though the fixes we did not long ago that are in 5.12-SNAPSHOT
> should help.
>
> On Thu, Apr 16, 2015 at 3:39 PM, Jon Gorrono <[hidden email]>
> wrote:
>
> > I am trying to do some testing an limit the output from a queue that has
> > about 4500 messages in it. Each message contains between 300 and 500
> > documents attached that i process individually. But I am trying to test
> > only specific message types and not see the others so I restart the
> client
> > each cycle with different selector header values. I only have the one
> > client (stomp) running at any time (besides the admin web app)... all
> > connections are with ack: client-individual
> >
> > The problem I see is that the selector works fine if the 'top' of the
> queue
> > contains messages that match the selector, but I see nothing if the
> > selector matches messages deep in the queue. The client just waits
> forever
> > ... it seems to me that the server is waiting for acks but since the
> > selector hides the initial messages from the client, there's nothing for
> it
> > to ack.
> >
> > There are no producers active... I restore the kahadb each time I need to
> > replenish the queue
> >
> > I have DEBUG log4j settings for activemq packages but the logs only log
> > checkpoints and expiry scans
> >
> >
> > I would expect that the selector would cause the server to browser down
> the
> > queue for message it can dequeue. But does not appear to be happening
> >
> >
> > broker config:
> >  <broker xmlns="http://activemq.apache.org/schema/core"
> > brokerName="localhost" dataDirectory="${activemq.data}">
> >
> >
> >         <destinationPolicy>
> >             <policyMap>
> >               <policyEntries>
> >                 <policyEntry topic=">" producerFlowControl="true">
> >
> >                   <pendingMessageLimitStrategy>
> >                     <constantPendingMessageLimitStrategy limit="1000"/>
> >                   </pendingMessageLimitStrategy>
> >                 </policyEntry>
> >                 <policyEntry queue=">" producerFlowControl="true"
> > memoryLimit="1mb">
> >
> >                 </policyEntry>
> >               </policyEntries>
> >             </policyMap>
> >         </destinationPolicy>
> >
> >
> >
> >         <managementContext>
> >             <managementContext createConnector="false"/>
> >         </managementContext>
> >
> >
> >         <persistenceAdapter>
> >             <kahaDB directory="${activemq.data}/kahadb"/>
> >         </persistenceAdapter>
> >
> >
> >
> >           <systemUsage>
> >             <systemUsage>
> >                 <memoryUsage>
> >                     <memoryUsage limit="64 mb"/>
> >                 </memoryUsage>
> >                 <storeUsage>
> >                     <storeUsage limit="100 gb"/>
> >                 </storeUsage>
> >                 <tempUsage>
> >                     <tempUsage limit="50 gb"/>
> >                 </tempUsage>
> >             </systemUsage>
> >         </systemUsage>
> >
> >
> >         <transportConnectors>
> >
> >         <transportConnector name="openwire"
> >
> >
> uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> >         <transportConnector name="amqp"
> >
> >
> uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> >         <transportConnector name="stomp" uri="stomp://host:61613"/>
> >         </transportConnectors>
> >
> >
> >         <shutdownHooks>
> >             <bean xmlns="http://www.springframework.org/schema/beans"
> > class="org.apache.activemq.hooks.SpringContextHook" />
> >         </shutdownHooks>
> >
> >     </broker>
> >
> > --
> > Jon Gorrono
> > PGP Key: 0x5434509D - http{
> > pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> > http{middleware.ucdavis.edu}
> >
>
>
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io
>



--
Jon Gorrono
PGP Key: 0x5434509D - http{
pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
http{middleware.ucdavis.edu}
Reply | Threaded
Open this post in threaded view
|

Re: selectors don't browse?

Tim Bain
In reply to this post by Jon Gorrono
As I understand it, the answer is "because the cursor isn't for the
consumer, it's for all consumers and therefore it can't account for your
consumer's selector".  Keep in mind that for a queue, messages are
available to the next consumer to take them, so if you have one cursor per
consumer you'll have to synchronize between the cursors to make sure they
can't both take the same message; the simpler solution (which is
incomplete, as you've found) is to just have a single cursor and
synchronize removals from it, which is what has been implemented (as I
understand it).

I think an enhancement is clearly needed here, whether it's a cursor per
consumer or a second cursor that iterates over the messages that aren't yet
in the cursor offering them to only the consumers that have reached the end
of the primary cursor or something else.  I recommend you submit an
enhancement request in JIRA to make sure this stays on people's radar and
eventually gets implemented.  (Though first you should search JIRA to make
sure there isn't already a request for this enhancement; if there is, add
detail if necessary in the comments and vote for it.)

Tim
On Apr 21, 2015 9:08 PM, "Jon Gorrono" <[hidden email]> wrote:

> Thanks for the response..
>
> Yeah I could figure out a way to drain the queue, but it seemed to me like
> something the server should be handling for the general case: I've
> subscribed with a selector, why is the cursor for this connection filled
> with anything that is not selected? I realize that may/must be an
> architectural limitation eg, the cursors, as currently defined, are
> 'closer' to the queue than the selection algorithms... I could accomplish
> this with a browsing connection and delayed/ out-of-band storage of
> ack-worthy messages... but, as I said, seems central to the function of a
> JMS server.
>
> On Thu, Apr 16, 2015 at 8:43 PM, Tim Bain <[hidden email]> wrote:
>
> > I think the behavior you're seeing has to do with message cursors (
> > http://activemq.apache.org/message-cursors.html) than with missing acks.
> > Basically once the cursor is full of messages that don't match your
> > selectors, no new messages will come out of the message store into the
> > cursor until one of the messages already in the cursor is consumed, and
> you
> > don't have any consumers that are going to do that.
> >
> > You could look for a way to increase the cursor size (I'm not sure if
> > that's possible; that web page doesn't describe how to do it and I've
> never
> > tried), you could set message expiration dates on your messages so that
> the
> > ones early in the queue eventually clear out (but then you have to set
> that
> > each time you restart your test, and you have to set them to different
> > values so that they roll off over time rather than all at once), or you
> > could just add another consumer that slowly consumes all messages - no
> > selector - to help clear the cursor (which is probably the easiest option
> > of the three)...  Just put a small sleep in that consumer's message
> handler
> > and you should eventually work through the problem.  Or maybe someone
> else
> > has an even better solution that I haven't thought of.
> >
> > Tim
> >
> > On Thu, Apr 16, 2015 at 4:39 PM, Jon Gorrono <[hidden email]>
> > wrote:
> >
> > > I am trying to do some testing an limit the output from a queue that
> has
> > > about 4500 messages in it. Each message contains between 300 and 500
> > > documents attached that i process individually. But I am trying to test
> > > only specific message types and not see the others so I restart the
> > client
> > > each cycle with different selector header values. I only have the one
> > > client (stomp) running at any time (besides the admin web app)... all
> > > connections are with ack: client-individual
> > >
> > > The problem I see is that the selector works fine if the 'top' of the
> > queue
> > > contains messages that match the selector, but I see nothing if the
> > > selector matches messages deep in the queue. The client just waits
> > forever
> > > ... it seems to me that the server is waiting for acks but since the
> > > selector hides the initial messages from the client, there's nothing
> for
> > it
> > > to ack.
> > >
> > > There are no producers active... I restore the kahadb each time I need
> to
> > > replenish the queue
> > >
> > > I have DEBUG log4j settings for activemq packages but the logs only log
> > > checkpoints and expiry scans
> > >
> > >
> > > I would expect that the selector would cause the server to browser down
> > the
> > > queue for message it can dequeue. But does not appear to be happening
> > >
> > >
> > > broker config:
> > >  <broker xmlns="http://activemq.apache.org/schema/core"
> > > brokerName="localhost" dataDirectory="${activemq.data}">
> > >
> > >
> > >         <destinationPolicy>
> > >             <policyMap>
> > >               <policyEntries>
> > >                 <policyEntry topic=">" producerFlowControl="true">
> > >
> > >                   <pendingMessageLimitStrategy>
> > >                     <constantPendingMessageLimitStrategy limit="1000"/>
> > >                   </pendingMessageLimitStrategy>
> > >                 </policyEntry>
> > >                 <policyEntry queue=">" producerFlowControl="true"
> > > memoryLimit="1mb">
> > >
> > >                 </policyEntry>
> > >               </policyEntries>
> > >             </policyMap>
> > >         </destinationPolicy>
> > >
> > >
> > >
> > >         <managementContext>
> > >             <managementContext createConnector="false"/>
> > >         </managementContext>
> > >
> > >
> > >         <persistenceAdapter>
> > >             <kahaDB directory="${activemq.data}/kahadb"/>
> > >         </persistenceAdapter>
> > >
> > >
> > >
> > >           <systemUsage>
> > >             <systemUsage>
> > >                 <memoryUsage>
> > >                     <memoryUsage limit="64 mb"/>
> > >                 </memoryUsage>
> > >                 <storeUsage>
> > >                     <storeUsage limit="100 gb"/>
> > >                 </storeUsage>
> > >                 <tempUsage>
> > >                     <tempUsage limit="50 gb"/>
> > >                 </tempUsage>
> > >             </systemUsage>
> > >         </systemUsage>
> > >
> > >
> > >         <transportConnectors>
> > >
> > >         <transportConnector name="openwire"
> > >
> > >
> >
> uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> > >         <transportConnector name="amqp"
> > >
> > >
> >
> uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> > >         <transportConnector name="stomp" uri="stomp://host:61613"/>
> > >         </transportConnectors>
> > >
> > >
> > >         <shutdownHooks>
> > >             <bean xmlns="http://www.springframework.org/schema/beans"
> > > class="org.apache.activemq.hooks.SpringContextHook" />
> > >         </shutdownHooks>
> > >
> > >     </broker>
> > >
> > > --
> > > Jon Gorrono
> > > PGP Key: 0x5434509D - http{
> > > pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> > > http{middleware.ucdavis.edu}
> > >
> >
>
>
>
> --
> Jon Gorrono
> PGP Key: 0x5434509D - http{
> pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> http{middleware.ucdavis.edu}
>
Reply | Threaded
Open this post in threaded view
|

Re: selectors don't browse?

gtully
the maxPageSize destination policy entry controls how many messages
are in memory to be dispatched. That value needs to be large enough to
deal with sparse selectors. Essentially at the moment, selectors only
match in memory. There is nothing that will iterate over the store to
find matches.

On 22 April 2015 at 13:49, Tim Bain <[hidden email]> wrote:

> As I understand it, the answer is "because the cursor isn't for the
> consumer, it's for all consumers and therefore it can't account for your
> consumer's selector".  Keep in mind that for a queue, messages are
> available to the next consumer to take them, so if you have one cursor per
> consumer you'll have to synchronize between the cursors to make sure they
> can't both take the same message; the simpler solution (which is
> incomplete, as you've found) is to just have a single cursor and
> synchronize removals from it, which is what has been implemented (as I
> understand it).
>
> I think an enhancement is clearly needed here, whether it's a cursor per
> consumer or a second cursor that iterates over the messages that aren't yet
> in the cursor offering them to only the consumers that have reached the end
> of the primary cursor or something else.  I recommend you submit an
> enhancement request in JIRA to make sure this stays on people's radar and
> eventually gets implemented.  (Though first you should search JIRA to make
> sure there isn't already a request for this enhancement; if there is, add
> detail if necessary in the comments and vote for it.)
>
> Tim
> On Apr 21, 2015 9:08 PM, "Jon Gorrono" <[hidden email]> wrote:
>
>> Thanks for the response..
>>
>> Yeah I could figure out a way to drain the queue, but it seemed to me like
>> something the server should be handling for the general case: I've
>> subscribed with a selector, why is the cursor for this connection filled
>> with anything that is not selected? I realize that may/must be an
>> architectural limitation eg, the cursors, as currently defined, are
>> 'closer' to the queue than the selection algorithms... I could accomplish
>> this with a browsing connection and delayed/ out-of-band storage of
>> ack-worthy messages... but, as I said, seems central to the function of a
>> JMS server.
>>
>> On Thu, Apr 16, 2015 at 8:43 PM, Tim Bain <[hidden email]> wrote:
>>
>> > I think the behavior you're seeing has to do with message cursors (
>> > http://activemq.apache.org/message-cursors.html) than with missing acks.
>> > Basically once the cursor is full of messages that don't match your
>> > selectors, no new messages will come out of the message store into the
>> > cursor until one of the messages already in the cursor is consumed, and
>> you
>> > don't have any consumers that are going to do that.
>> >
>> > You could look for a way to increase the cursor size (I'm not sure if
>> > that's possible; that web page doesn't describe how to do it and I've
>> never
>> > tried), you could set message expiration dates on your messages so that
>> the
>> > ones early in the queue eventually clear out (but then you have to set
>> that
>> > each time you restart your test, and you have to set them to different
>> > values so that they roll off over time rather than all at once), or you
>> > could just add another consumer that slowly consumes all messages - no
>> > selector - to help clear the cursor (which is probably the easiest option
>> > of the three)...  Just put a small sleep in that consumer's message
>> handler
>> > and you should eventually work through the problem.  Or maybe someone
>> else
>> > has an even better solution that I haven't thought of.
>> >
>> > Tim
>> >
>> > On Thu, Apr 16, 2015 at 4:39 PM, Jon Gorrono <[hidden email]>
>> > wrote:
>> >
>> > > I am trying to do some testing an limit the output from a queue that
>> has
>> > > about 4500 messages in it. Each message contains between 300 and 500
>> > > documents attached that i process individually. But I am trying to test
>> > > only specific message types and not see the others so I restart the
>> > client
>> > > each cycle with different selector header values. I only have the one
>> > > client (stomp) running at any time (besides the admin web app)... all
>> > > connections are with ack: client-individual
>> > >
>> > > The problem I see is that the selector works fine if the 'top' of the
>> > queue
>> > > contains messages that match the selector, but I see nothing if the
>> > > selector matches messages deep in the queue. The client just waits
>> > forever
>> > > ... it seems to me that the server is waiting for acks but since the
>> > > selector hides the initial messages from the client, there's nothing
>> for
>> > it
>> > > to ack.
>> > >
>> > > There are no producers active... I restore the kahadb each time I need
>> to
>> > > replenish the queue
>> > >
>> > > I have DEBUG log4j settings for activemq packages but the logs only log
>> > > checkpoints and expiry scans
>> > >
>> > >
>> > > I would expect that the selector would cause the server to browser down
>> > the
>> > > queue for message it can dequeue. But does not appear to be happening
>> > >
>> > >
>> > > broker config:
>> > >  <broker xmlns="http://activemq.apache.org/schema/core"
>> > > brokerName="localhost" dataDirectory="${activemq.data}">
>> > >
>> > >
>> > >         <destinationPolicy>
>> > >             <policyMap>
>> > >               <policyEntries>
>> > >                 <policyEntry topic=">" producerFlowControl="true">
>> > >
>> > >                   <pendingMessageLimitStrategy>
>> > >                     <constantPendingMessageLimitStrategy limit="1000"/>
>> > >                   </pendingMessageLimitStrategy>
>> > >                 </policyEntry>
>> > >                 <policyEntry queue=">" producerFlowControl="true"
>> > > memoryLimit="1mb">
>> > >
>> > >                 </policyEntry>
>> > >               </policyEntries>
>> > >             </policyMap>
>> > >         </destinationPolicy>
>> > >
>> > >
>> > >
>> > >         <managementContext>
>> > >             <managementContext createConnector="false"/>
>> > >         </managementContext>
>> > >
>> > >
>> > >         <persistenceAdapter>
>> > >             <kahaDB directory="${activemq.data}/kahadb"/>
>> > >         </persistenceAdapter>
>> > >
>> > >
>> > >
>> > >           <systemUsage>
>> > >             <systemUsage>
>> > >                 <memoryUsage>
>> > >                     <memoryUsage limit="64 mb"/>
>> > >                 </memoryUsage>
>> > >                 <storeUsage>
>> > >                     <storeUsage limit="100 gb"/>
>> > >                 </storeUsage>
>> > >                 <tempUsage>
>> > >                     <tempUsage limit="50 gb"/>
>> > >                 </tempUsage>
>> > >             </systemUsage>
>> > >         </systemUsage>
>> > >
>> > >
>> > >         <transportConnectors>
>> > >
>> > >         <transportConnector name="openwire"
>> > >
>> > >
>> >
>> uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>> > >         <transportConnector name="amqp"
>> > >
>> > >
>> >
>> uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>> > >         <transportConnector name="stomp" uri="stomp://host:61613"/>
>> > >         </transportConnectors>
>> > >
>> > >
>> > >         <shutdownHooks>
>> > >             <bean xmlns="http://www.springframework.org/schema/beans"
>> > > class="org.apache.activemq.hooks.SpringContextHook" />
>> > >         </shutdownHooks>
>> > >
>> > >     </broker>
>> > >
>> > > --
>> > > Jon Gorrono
>> > > PGP Key: 0x5434509D - http{
>> > > pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
>> > > http{middleware.ucdavis.edu}
>> > >
>> >
>>
>>
>>
>> --
>> Jon Gorrono
>> PGP Key: 0x5434509D - http{
>> pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
>> http{middleware.ucdavis.edu}
>>