put a message in queue using org.apache.activemq.broker.Broker

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

put a message in queue using org.apache.activemq.broker.Broker

Altounisi
Hello,

I was trying to implement an interceptor to log some events happening in the broker. I want the events to be logged as messages in queue. Thus, the monitoring client could access the log by consuming messages from that queue.

(I didn't uses advisory topics because I want to dequeue treated messages so that they will not be treated many times).

So I used interceptors as described here.
This example worked correctly for me. However when I do this modification (to put the log as messages in a queue), I doesn't work any more. (The modification is in Bold)


public class MyBroker extends BrokerFilter {
     public MyBroker(Broker next) {
        super(next);                
    }
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {      
         
        factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue("monitoring.notifications");
        producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage();
        message.setText("addConnection "+info.getConnectionId()+"   !"+info.getClientId());
        producer.send(message);
        session.close();
        connection.close();

        super.addConnection(context, info);
    }  
}


This seemed logic for me since my modification passes throw the interceptor witch makes infinite calls for this class.
For this reason, I thought to take profit of the object "next" in this implementation which is of type org.apache.activemq.broker.Broker to put a message in the queue.

Could you please help me ?

Regards,
Reply | Threaded
Open this post in threaded view
|

Re: put a message in queue using org.apache.activemq.broker.Broker

christopher.l.shannon
As you have found out you shouldn't try and use the JMS client api from inside the broker, as that is for client code. I'm not entirely sure I understand why you can't use advisory topics but if you want to send a message to another destination from inside the broker there are lots of good examples inside the broker itself.  One good example in particular is the Statistics Broker class, StatisticsBroker.java

If you take a look at the send and sendStats methods, you can see how a new message can be sent to a destination.  In your case the key is that you would want to create a message using ActiveMQTextMessage and a destination using the ActiveMQDestination class and then configure a ProducerBrokerExchange so you can send your new message to the destination you want.

Hopefully this helps.
Reply | Threaded
Open this post in threaded view
|

Re: put a message in queue using org.apache.activemq.broker.Broker

Altounisi
In reply to this post by Altounisi
Thank you very much christopher.l.shannon.
Your reply helped me to solve the problem.
This is the new code (the modifications are in bold) for any one who may be interested. Your critics are welcomed of course.

Regards,


public class MyBroker extends BrokerFilter {
     public MyBroker(Broker next) {
        super(next);                
    }
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {      
        sendNotification(context,"addConnection",info.getConnectionId().toString(),info.getClientId(),info.getClientIp());
        super.addConnection(context, info);
    }
    public void sendNotification (ConnectionContext context,String notificationType,String id,String clientId,String clientIp) throws Exception
    {
    ActiveMQDestination destination = ActiveMQDestination.createDestination("monitoring.notifications",ActiveMQDestination.QUEUE_TYPE);
    ActiveMQMapMessage msg = new ActiveMQMapMessage();
    msg.setString("notificationType",notificationType);
    msg.setString("id",id);
    msg.setString("clientId",clientId);
    msg.setString("clientIp",clientIp);
    msg.setPersistent(false);
        msg.setTimestamp(System.currentTimeMillis());
        msg.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
        msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
        msg.setDestination(destination);
        msg.setResponseRequired(false);
        ProducerId producerId = new ProducerId("monitorNotifier");
        msg.setProducerId(producerId);
        msg.setMessageId(new MessageId(producerId, this.messageIdGenerator.getNextSequenceId()));
        boolean originalFlowControl = context.isProducerFlowControl();
        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
        producerExchange.setConnectionContext(context);
        producerExchange.setMutable(true);
        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
        try {
            context.setProducerFlowControl(false);
            this.next.send(producerExchange, msg);
        } finally {
            context.setProducerFlowControl(originalFlowControl);
        }
    }

}
Reply | Threaded
Open this post in threaded view
|

Re: put a message in queue using org.apache.activemq.broker.Broker

christopher.l.shannon
I'm glad that helped.  At a quick glance your new code looks pretty good.
The important thing was being able to only send the message to the "next"
broker so that your broker filter doesn't get called in an infinite loop
and your new code does this.  Your new code is also much more efficient
than before since you aren't trying to open up a new connection and session
for each message.

-Chris

On Tue, Jun 9, 2015 at 10:46 AM, Altounisi <[hidden email]> wrote:

> Thank you very much christopher.l.shannon.
> Your reply helped me to solve the problem.
> This is the new code (the modifications are in bold) for any one who may be
> interested. Your critics are welcomed of course.
>
> Regards,
>
>
> public class MyBroker extends BrokerFilter {
>      public MyBroker(Broker next) {
>         super(next);
>     }
>     public void addConnection(ConnectionContext context, ConnectionInfo
> info) throws Exception {
>
>
> *sendNotification(context,"addConnection",info.getConnectionId().toString(),info.getClientId(),info.getClientIp());*
>         super.addConnection(context, info);
>     }
>     *public void sendNotification (ConnectionContext context,String
> notificationType,String id,String clientId,String clientIp) throws
> Exception
>     {
>         ActiveMQDestination destination =
>
> ActiveMQDestination.createDestination("monitoring.notifications",ActiveMQDestination.QUEUE_TYPE);
>         ActiveMQMapMessage msg = new ActiveMQMapMessage();
>         msg.setString("notificationType",notificationType);
>         msg.setString("id",id);
>         msg.setString("clientId",clientId);
>         msg.setString("clientIp",clientIp);
>         msg.setPersistent(false);
>         msg.setTimestamp(System.currentTimeMillis());
>         msg.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
>         msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
>         msg.setDestination(destination);
>         msg.setResponseRequired(false);
>         ProducerId producerId = new ProducerId("monitorNotifier");
>         msg.setProducerId(producerId);
>         msg.setMessageId(new MessageId(producerId,
> this.messageIdGenerator.getNextSequenceId()));
>         boolean originalFlowControl = context.isProducerFlowControl();
>         final ProducerBrokerExchange producerExchange = new
> ProducerBrokerExchange();
>         producerExchange.setConnectionContext(context);
>         producerExchange.setMutable(true);
>         producerExchange.setProducerState(new ProducerState(new
> ProducerInfo()));
>         try {
>             context.setProducerFlowControl(false);
>             this.next.send(producerExchange, msg);
>         } finally {
>             context.setProducerFlowControl(originalFlowControl);
>         }
>     }*
> }
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/put-a-message-in-queue-using-org-apache-activemq-broker-Broker-tp4697413p4697470.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>