[activemq-artemis] branch master updated (05b9bf6 -> fa6a008)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated (05b9bf6 -> fa6a008)

clebertsuconic-2
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 05b9bf6  This closes #2980
     new b76700c  NO-JIRA fix JMSMessageConsumerTest.testTimedOutWaitingForWriteLogOnConsumer
     new 1c15e1f  NO-JIRA fixing stale comments
     new 72429e1  ARTEMIS-2325 ack handler being invoked twice
     new fa6a008  This closes #2976

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/client/impl/ClientProducerImpl.java       | 35 ++++----
 .../impl/SendAcknowledgementHandlerWrapper.java    | 60 ++++++++++++++
 .../protocol/core/impl/ActiveMQSessionContext.java | 11 ++-
 .../artemis/spi/core/remoting/SessionContext.java  |  2 +
 .../jms/client/ActiveMQMessageProducer.java        | 92 +++++++++-------------
 .../integration/amqp/JMSMessageConsumerTest.java   |  3 +
 .../SessionSendAcknowledgementHandlerTest.java     | 72 +++++++++++++++++
 .../JmsProducerCompletionListenerTest.java         |  9 ++-
 8 files changed, 199 insertions(+), 85 deletions(-)
 create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 01/04: NO-JIRA fix JMSMessageConsumerTest.testTimedOutWaitingForWriteLogOnConsumer

clebertsuconic-2
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b76700c18fd8dd3dd112f7ba6772266830288544
Author: Justin Bertram <[hidden email]>
AuthorDate: Sun Feb 9 16:50:31 2020 -0600

    NO-JIRA fix JMSMessageConsumerTest.testTimedOutWaitingForWriteLogOnConsumer
---
 .../artemis/tests/integration/amqp/JMSMessageConsumerTest.java         | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index a634b44..14c019b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -41,6 +41,7 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
@@ -803,6 +804,8 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
    @Test(timeout = 30000)
    public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
       String name = "exampleQueue1";
+      // disable auto-delete as it causes thrashing during the test
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteQueues(false));
 
       final int numMessages = 40;
 

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 02/04: NO-JIRA fixing stale comments

clebertsuconic-2
In reply to this post by clebertsuconic-2
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 1c15e1f8128f5d24435cd64424c989e3dfd94999
Author: Justin Bertram <[hidden email]>
AuthorDate: Thu Feb 13 11:14:00 2020 -0600

    NO-JIRA fixing stale comments
---
 .../apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index ccc5d7c..a3eecec 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -539,14 +539,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
                try {
                   ((StreamMessage) jmsMessage).reset();
                } catch (JMSException e) {
-                  // HORNETQ-1209 XXX ignore?
+                  logger.debug("ignoring exception", e);
                }
             }
             if (jmsMessage instanceof BytesMessage) {
                try {
                   ((BytesMessage) jmsMessage).reset();
                } catch (JMSException e) {
-                  // HORNETQ-1209 XXX ignore?
+                  logger.debug("ignoring exception", e);
                }
             }
 

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 03/04: ARTEMIS-2325 ack handler being invoked twice

clebertsuconic-2
In reply to this post by clebertsuconic-2
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 72429e1e49b9d23c25e040f0e0e1f02b7db2bba3
Author: Justin Bertram <[hidden email]>
AuthorDate: Fri Feb 7 16:29:17 2020 -0600

    ARTEMIS-2325 ack handler being invoked twice
---
 .../core/client/impl/ClientProducerImpl.java       | 35 ++++----
 .../impl/SendAcknowledgementHandlerWrapper.java    | 60 ++++++++++++++
 .../protocol/core/impl/ActiveMQSessionContext.java | 11 ++-
 .../artemis/spi/core/remoting/SessionContext.java  |  2 +
 .../jms/client/ActiveMQMessageProducer.java        | 92 +++++++++-------------
 .../SessionSendAcknowledgementHandlerTest.java     | 72 +++++++++++++++++
 .../JmsProducerCompletionListenerTest.java         |  9 ++-
 7 files changed, 196 insertions(+), 85 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index b015be6..a55a5b1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -26,7 +26,6 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
@@ -43,8 +42,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
    private static final Logger logger = Logger.getLogger(ClientProducerImpl.class);
 
-   private static boolean confirmationNotSetLogged = false;
-
    private final SimpleString address;
 
    private final ClientSessionInternal session;
@@ -118,14 +115,14 @@ public class ClientProducerImpl implements ClientProducerInternal {
    public void send(final Message msg) throws ActiveMQException {
       checkClosed();
 
-      doSend(null, msg, null);
+      send(null, msg, sessionContext.getSendAcknowledgementHandler());
    }
 
    @Override
    public void send(final SimpleString address1, final Message msg) throws ActiveMQException {
       checkClosed();
 
-      doSend(address1, msg, null);
+      send(address1, msg, sessionContext.getSendAcknowledgementHandler());
    }
 
    @Override
@@ -138,24 +135,20 @@ public class ClientProducerImpl implements ClientProducerInternal {
                     Message message,
                     SendAcknowledgementHandler handler) throws ActiveMQException {
       checkClosed();
-      boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled();
-      if (confirmationWindowEnabled) {
-         doSend(address1, message, handler);
-      } else {
-         doSend(address1, message, null);
-         if (handler != null) {
-            if (logger.isDebugEnabled()) {
-               logger.debug("Handler was used on producing messages towards address " + (address1 == null ? null : address1.toString()) + " however there is no confirmationWindowEnabled");
-            }
 
-            if (!confirmationNotSetLogged) {
-               // will log thisonly once
-               ActiveMQClientLogger.LOGGER.confirmationNotSet();
-            }
+      if (handler != null) {
+         handler = new SendAcknowledgementHandlerWrapper(handler);
+      }
+
+      doSend(address1, message, handler);
 
-            // if there is no confirmation enabled, we will at least call the handler after the sent is done
-            session.scheduleConfirmation(handler, message);
+      if (handler != null && !session.isConfirmationWindowEnabled()) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Handler was used on producing messages towards address " + address1 + " however there is no confirmationWindowEnabled");
          }
+
+         // if there is no confirmation enabled, we will at least call the handler after the sent is done
+         session.scheduleConfirmation(handler, message);
       }
    }
 
@@ -265,7 +258,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
          final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
          // if Handler != null, we will send non blocking
-         final boolean sendBlocking = sendBlockingConfig && handler == null;
+         final boolean sendBlocking = sendBlockingConfig && handler == null && sessionContext.getSendAcknowledgementHandler() == null;
 
          session.workDone();
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
new file mode 100644
index 0000000..e935b10
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+
+public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHandler {
+
+   private SendAcknowledgementHandler wrapped;
+
+   /**
+    * It's possible that a SendAcknowledgementHandler might be called twice due to subsequent
+    * packet confirmations on the same connection. Using this boolean avoids that possibility.
+    * A new SendAcknowledgementHandlerWrapper is created for each message sent so once it's
+    * called once it will never be called again.
+    */
+   private volatile boolean active = true;
+
+   public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler wrapped) {
+      this.wrapped = wrapped;
+   }
+
+   @Override
+   public void sendAcknowledged(Message message) {
+      if (active) {
+         try {
+            wrapped.sendAcknowledged(message);
+         } finally {
+            active = false;
+         }
+      }
+   }
+
+   @Override
+   public void sendFailed(Message message, Exception e) {
+      if (active) {
+         try {
+            wrapped.sendFailed(message, e);
+         } finally {
+            active = false;
+         }
+      }
+   }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 26c405c..4843aaf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -233,12 +233,6 @@ public class ActiveMQSessionContext extends SessionContext {
             } else {
                handler.sendFailed(message, exception);
             }
-         } else if (sendAckHandler != null) {
-            if (exception == null) {
-               sendAckHandler.sendAcknowledged(message);
-            } else {
-               sendAckHandler.sendFailed(message, exception);
-            }
          }
       }
    };
@@ -283,6 +277,11 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
+   public SendAcknowledgementHandler getSendAcknowledgementHandler() {
+      return this.sendAckHandler;
+   }
+
+   @Override
    public void createSharedQueue(SimpleString address,
                                  SimpleString queueName,
                                  RoutingType routingType,
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 9335204..db27511 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -164,6 +164,8 @@ public abstract class SessionContext {
 
    public abstract void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
 
+   public abstract SendAcknowledgementHandler getSendAcknowledgementHandler();
+
    /**
     * Creates a shared queue using the routing type set by the Address.  If the Address supports more than one type of delivery
     * then the default delivery mode (MULTICAST) is used.
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index a3eecec..0c77132 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -34,8 +34,6 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -513,14 +511,6 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       private final ActiveMQMessageProducer producer;
 
       /**
-       * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent
-       * packet confirmations on the same connection. Using this boolean avoids that possibility.
-       * A new CompletionListenerWrapper is created for each message sent so once it's called once
-       * it will never be called again.
-       */
-      private AtomicBoolean active = new AtomicBoolean(true);
-
-      /**
        * @param jmsMessage
        * @param producer
        */
@@ -534,62 +524,56 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
 
       @Override
       public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
-         if (active.get()) {
-            if (jmsMessage instanceof StreamMessage) {
-               try {
-                  ((StreamMessage) jmsMessage).reset();
-               } catch (JMSException e) {
-                  logger.debug("ignoring exception", e);
-               }
-            }
-            if (jmsMessage instanceof BytesMessage) {
-               try {
-                  ((BytesMessage) jmsMessage).reset();
-               } catch (JMSException e) {
-                  logger.debug("ignoring exception", e);
-               }
+         if (jmsMessage instanceof StreamMessage) {
+            try {
+               ((StreamMessage) jmsMessage).reset();
+            } catch (JMSException e) {
+               logger.debug("ignoring exception", e);
             }
-
+         }
+         if (jmsMessage instanceof BytesMessage) {
             try {
-               producer.connection.getThreadAwareContext().setCurrentThread(true);
-               completionListener.onCompletion(jmsMessage);
-            } finally {
-               producer.connection.getThreadAwareContext().clearCurrentThread(true);
-               active.set(false);
+               ((BytesMessage) jmsMessage).reset();
+            } catch (JMSException e) {
+               logger.debug("ignoring exception", e);
             }
          }
+
+         try {
+            producer.connection.getThreadAwareContext().setCurrentThread(true);
+            completionListener.onCompletion(jmsMessage);
+         } finally {
+            producer.connection.getThreadAwareContext().clearCurrentThread(true);
+         }
       }
 
       @Override
       public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
-         if (active.get()) {
-            if (jmsMessage instanceof StreamMessage) {
-               try {
-                  ((StreamMessage) jmsMessage).reset();
-               } catch (JMSException e) {
-                  // HORNETQ-1209 XXX ignore?
-               }
+         if (jmsMessage instanceof StreamMessage) {
+            try {
+               ((StreamMessage) jmsMessage).reset();
+            } catch (JMSException e) {
+               // HORNETQ-1209 XXX ignore?
             }
-            if (jmsMessage instanceof BytesMessage) {
-               try {
-                  ((BytesMessage) jmsMessage).reset();
-               } catch (JMSException e) {
-                  // HORNETQ-1209 XXX ignore?
-               }
+         }
+         if (jmsMessage instanceof BytesMessage) {
+            try {
+               ((BytesMessage) jmsMessage).reset();
+            } catch (JMSException e) {
+               // HORNETQ-1209 XXX ignore?
             }
+         }
 
-            try {
-               producer.connection.getThreadAwareContext().setCurrentThread(true);
-               if (exception instanceof ActiveMQException) {
-                  exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
-               } else if (exception instanceof ActiveMQInterruptedException) {
-                  exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
-               }
-               completionListener.onException(jmsMessage, exception);
-            } finally {
-               producer.connection.getThreadAwareContext().clearCurrentThread(true);
-               active.set(false);
+         try {
+            producer.connection.getThreadAwareContext().setCurrentThread(true);
+            if (exception instanceof ActiveMQException) {
+               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
+            } else if (exception instanceof ActiveMQInterruptedException) {
+               exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
             }
+            completionListener.onException(jmsMessage, exception);
+         } finally {
+            producer.connection.getThreadAwareContext().clearCurrentThread(true);
          }
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
index 95ef668..8b88073 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -29,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -152,6 +154,76 @@ public class SessionSendAcknowledgementHandlerTest extends ActiveMQTestBase {
       Assert.assertTrue("producer specific handler must have acked, " + producerHandler, producerHandler.latch.await(5, TimeUnit.SECONDS));
    }
 
+   @Test
+   public void testHandlerOnSend() throws Exception {
+      final int MSG_COUNT = 750;
+      ServerLocator locator = createInVMNonHALocator();
+      locator.setConfirmationWindowSize(256);
+
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession();
+      ClientProducer producer = session.createProducer(address);
+      final AtomicInteger count = new AtomicInteger(0);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         ClientMessage message = session.createMessage(true);
+         producer.send(message, message1 -> count.incrementAndGet());
+      }
+      Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
+   }
+
+   @Test
+   public void testHandlerOnSendWithAnonymousProducer() throws Exception {
+      final int MSG_COUNT = 750;
+      ServerLocator locator = createInVMNonHALocator();
+      locator.setConfirmationWindowSize(256);
+
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession();
+      final AtomicInteger count = new AtomicInteger(0);
+      ClientProducer producer = session.createProducer();
+      for (int i = 0; i < MSG_COUNT; i++) {
+         ClientMessage message = session.createMessage(true);
+         producer.send(address, message, message1 -> count.incrementAndGet());
+      }
+      Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
+   }
+
+   @Test
+   public void testHandlerOnSession() throws Exception {
+      final int MSG_COUNT = 750;
+      ServerLocator locator = createInVMNonHALocator();
+      locator.setConfirmationWindowSize(256);
+
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession();
+      final AtomicInteger count = new AtomicInteger(0);
+      session.setSendAcknowledgementHandler(message1 -> count.incrementAndGet());
+      ClientProducer producer = session.createProducer(address);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         ClientMessage message = session.createMessage(true);
+         producer.send(message);
+      }
+      Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
+   }
+
+   @Test
+   public void testHandlerOnSessionWithAnonymousProducer() throws Exception {
+      final int MSG_COUNT = 750;
+      ServerLocator locator = createInVMNonHALocator();
+      locator.setConfirmationWindowSize(256);
+
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession();
+      final AtomicInteger count = new AtomicInteger(0);
+      session.setSendAcknowledgementHandler(message1 -> count.incrementAndGet());
+      ClientProducer producer = session.createProducer();
+      for (int i = 0; i < MSG_COUNT; i++) {
+         ClientMessage message = session.createMessage(true);
+         producer.send(address, message);
+      }
+      Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
+   }
+
    public static final class LatchAckHandler implements SendAcknowledgementHandler {
 
       public CountDownLatch latch;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
index 3020310..8049783 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
@@ -43,7 +44,7 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class JmsProducerCompletionListenerTest extends JMSTestBase {
 
-   static final int TOTAL_MSGS = 20;
+   static final int TOTAL_MSGS = 200;
 
    private JMSContext context;
    private JMSProducer producer;
@@ -85,7 +86,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
       JMSConsumer consumer = context.createConsumer(queue);
       sendMessages(context, producer, queue, TOTAL_MSGS);
       receiveMessages(consumer, 0, TOTAL_MSGS, true);
-
+      assertEquals(TOTAL_MSGS, cl.completion.get());
       context.close();
       Assert.assertTrue("completion listener should be called", cl.completionLatch.await(3, TimeUnit.SECONDS));
    }
@@ -191,7 +192,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
 
    public static final class CountingCompletionListener implements CompletionListener {
 
-      public int completion;
+      public AtomicInteger completion = new AtomicInteger(0);
       public int error;
       public CountDownLatch completionLatch;
       public Message lastMessage;
@@ -202,7 +203,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
 
       @Override
       public void onCompletion(Message message) {
-         completion++;
+         completion.incrementAndGet();
          completionLatch.countDown();
          lastMessage = message;
       }

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 04/04: This closes #2976

clebertsuconic-2
In reply to this post by clebertsuconic-2
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit fa6a008fa9121c1391fe5009d4a55ff3d2f89388
Merge: 05b9bf6 72429e1
Author: Clebert Suconic <[hidden email]>
AuthorDate: Fri Feb 14 13:12:49 2020 -0500

    This closes #2976

 .../core/client/impl/ClientProducerImpl.java       | 35 ++++----
 .../impl/SendAcknowledgementHandlerWrapper.java    | 60 ++++++++++++++
 .../protocol/core/impl/ActiveMQSessionContext.java | 11 ++-
 .../artemis/spi/core/remoting/SessionContext.java  |  2 +
 .../jms/client/ActiveMQMessageProducer.java        | 92 +++++++++-------------
 .../integration/amqp/JMSMessageConsumerTest.java   |  3 +
 .../SessionSendAcknowledgementHandlerTest.java     | 72 +++++++++++++++++
 .../JmsProducerCompletionListenerTest.java         |  9 ++-
 8 files changed, 199 insertions(+), 85 deletions(-)