Using ActiveMQ with more than one consumer instance

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Using ActiveMQ with more than one consumer instance

Moshe Recanati
Hi,
In our architecture we've 2 or more containers for each application for
high availiability puproses.
We're using activeMQ and I would like to implement the following behavior.

1. Pushing to a message to queue.
2. This message will be consumed only by *one *container (the first one
that will read it).
2. Once message processed successfully the consumer will update this
message can be acknowledged and neglected
3. I tried using transaction with commit and using CLIENT_ACKNOWLEDGE however
in both cases both consumers got the message and processed it.

I want only one consumer to process each message based on availability.

Please share the way to implement it.

Thank you
Moshe
Reply | Threaded
Open this post in threaded view
|

Re: Using ActiveMQ with more than one consumer instance

Tim Bain
The behavior you want is the default behavior for a queue, so if you're
seeing the behavior you describe, it probably means you're doing something
wrong with the code that's acknowledging the message. (That, or you're
using a topic, not a queue.) Can you post the code that acknowledges the
message?

Alternatively, can you use AUTO_ACK mode instead of CLIENT_ACK mode, to
eliminate any errors relating to your acknowledgement code?

Tim

On Sat, Oct 13, 2018, 1:33 PM Moshe Recanati <
[hidden email]> wrote:

> Hi,
> In our architecture we've 2 or more containers for each application for
> high availiability puproses.
> We're using activeMQ and I would like to implement the following behavior.
>
> 1. Pushing to a message to queue.
> 2. This message will be consumed only by *one *container (the first one
> that will read it).
> 2. Once message processed successfully the consumer will update this
> message can be acknowledged and neglected
> 3. I tried using transaction with commit and using CLIENT_ACKNOWLEDGE
> however
> in both cases both consumers got the message and processed it.
>
> I want only one consumer to process each message based on availability.
>
> Please share the way to implement it.
>
> Thank you
> Moshe
>
Reply | Threaded
Open this post in threaded view
|

Re: Using ActiveMQ with more than one consumer instance

Moshe Recanati
Hi Tim,
Thank you see my code below.
Keep in mind that I do want that message will be available until it
processed successfully.

// Establish a connection for the consumer.
    final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession =
consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

final Destination consumerDestination = consumerSession.createQueue(queueName);

*public void recieveMessages(String queueName) throws Exception {*
























*// Create a message consumer from the session to the queue.final
MessageConsumer consumer =
consumerSession.createConsumer(consumerDestination);// Begin to wait for
messages.Queue queue = consumerSession.createQueue(queueName);QueueBrowser
queueBrowser = consumerSession.createBrowser(queue);Enumeration msgs =
queueBrowser.getEnumeration();while (msgs.hasMoreElements()) { //do your
things here ActiveMQTextMessage message = (ActiveMQTextMessage)
msgs.nextElement(); if (message == null) continue; //handle message
System.out.println("Message received in : " + message); try { String text =
message.getText(); JSONObject messageJson = new JSONObject(text);
consumer.receive(1000); String responseString = handleMessage(messageJson);
message.acknowledge();*

*.....*


Moshe

On Sun, Oct 14, 2018 at 5:52 AM Tim Bain <[hidden email]> wrote:

> The behavior you want is the default behavior for a queue, so if you're
> seeing the behavior you describe, it probably means you're doing something
> wrong with the code that's acknowledging the message. (That, or you're
> using a topic, not a queue.) Can you post the code that acknowledges the
> message?
>
> Alternatively, can you use AUTO_ACK mode instead of CLIENT_ACK mode, to
> eliminate any errors relating to your acknowledgement code?
>
> Tim
>
> On Sat, Oct 13, 2018, 1:33 PM Moshe Recanati <
> [hidden email]> wrote:
>
> > Hi,
> > In our architecture we've 2 or more containers for each application for
> > high availiability puproses.
> > We're using activeMQ and I would like to implement the following
> behavior.
> >
> > 1. Pushing to a message to queue.
> > 2. This message will be consumed only by *one *container (the first one
> > that will read it).
> > 2. Once message processed successfully the consumer will update this
> > message can be acknowledged and neglected
> > 3. I tried using transaction with commit and using CLIENT_ACKNOWLEDGE
> > however
> > in both cases both consumers got the message and processed it.
> >
> > I want only one consumer to process each message based on availability.
> >
> > Please share the way to implement it.
> >
> > Thank you
> > Moshe
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Using ActiveMQ with more than one consumer instance

Tim Bain
Have you confirmed, either via logging or a debugger, that
message.acknowledge() is being called? An exception thrown in
handleMessage() or the JSONObject() constructor would result in the
behavior you're describing.

Why are you calling consumer.receive(1000) after processing the message but
before acknowledging it?

Are you sure you want to use CLIENT_ACKNOWLEDGE mode instead of
INDIVIDUAL_ACKNOWLEDGE mode?

Tim

On Sat, Oct 13, 2018, 9:25 PM Moshe Recanati <
[hidden email]> wrote:

> Hi Tim,
> Thank you see my code below.
> Keep in mind that I do want that message will be available until it
> processed successfully.
>
> // Establish a connection for the consumer.
>     final Connection consumerConnection =
> connectionFactory.createConnection();
> consumerConnection.start();
> // Create a session.
> final Session consumerSession =
> consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
>
> final Destination consumerDestination =
> consumerSession.createQueue(queueName);
>
> *public void recieveMessages(String queueName) throws Exception {*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *// Create a message consumer from the session to the queue.final
> MessageConsumer consumer =
> consumerSession.createConsumer(consumerDestination);// Begin to wait for
> messages.Queue queue = consumerSession.createQueue(queueName);QueueBrowser
> queueBrowser = consumerSession.createBrowser(queue);Enumeration msgs =
> queueBrowser.getEnumeration();while (msgs.hasMoreElements()) { //do your
> things here ActiveMQTextMessage message = (ActiveMQTextMessage)
> msgs.nextElement(); if (message == null) continue; //handle message
> System.out.println("Message received in : " + message); try { String text =
> message.getText(); JSONObject messageJson = new JSONObject(text);
> consumer.receive(1000); String responseString = handleMessage(messageJson);
> message.acknowledge();*
>
> *.....*
>
>
> Moshe
>
> On Sun, Oct 14, 2018 at 5:52 AM Tim Bain <[hidden email]> wrote:
>
> > The behavior you want is the default behavior for a queue, so if you're
> > seeing the behavior you describe, it probably means you're doing
> something
> > wrong with the code that's acknowledging the message. (That, or you're
> > using a topic, not a queue.) Can you post the code that acknowledges the
> > message?
> >
> > Alternatively, can you use AUTO_ACK mode instead of CLIENT_ACK mode, to
> > eliminate any errors relating to your acknowledgement code?
> >
> > Tim
> >
> > On Sat, Oct 13, 2018, 1:33 PM Moshe Recanati <
> > [hidden email]> wrote:
> >
> > > Hi,
> > > In our architecture we've 2 or more containers for each application for
> > > high availiability puproses.
> > > We're using activeMQ and I would like to implement the following
> > behavior.
> > >
> > > 1. Pushing to a message to queue.
> > > 2. This message will be consumed only by *one *container (the first one
> > > that will read it).
> > > 2. Once message processed successfully the consumer will update this
> > > message can be acknowledged and neglected
> > > 3. I tried using transaction with commit and using CLIENT_ACKNOWLEDGE
> > > however
> > > in both cases both consumers got the message and processed it.
> > >
> > > I want only one consumer to process each message based on availability.
> > >
> > > Please share the way to implement it.
> > >
> > > Thank you
> > > Moshe
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Using ActiveMQ with more than one consumer instance

art.licis
In reply to this post by Moshe Recanati
Hi Moshe,

Looks like the problem is about how you consume messages. Instead of using
regular subscribe approach, you retain messages using queue browser:

QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();

This is wrong. Queue browser's purpose is to look at messages without
actually removing them from a queue. Therefore, your messages stay in the
queue and remain available for other consumers.

- Art




--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html