[1/2] activemq-artemis git commit: ARTEMIS-1850 QueueControl.listDeliveringMessages returns empty result

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/2] activemq-artemis git commit: ARTEMIS-1850 QueueControl.listDeliveringMessages returns empty result

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master c0a40a161 -> 7c5470548


ARTEMIS-1850 QueueControl.listDeliveringMessages returns empty result

With AMQP protocol when some messages are received in a transaction,
calling JMX QueueControl.listDeliveringMessages() returns empty list
before the transaction is committed.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/72eadb20
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/72eadb20
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/72eadb20

Branch: refs/heads/master
Commit: 72eadb201d870b097c8659497823f27bf2401d6f
Parents: c0a40a1
Author: Howard Gao <[hidden email]>
Authored: Mon May 7 14:33:16 2018 +0800
Committer: Howard Gao <[hidden email]>
Committed: Tue Nov 6 20:23:32 2018 +0800

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 19 ++++
 .../transaction/ProtonTransactionHandler.java   | 11 ++-
 .../core/server/impl/ServerSessionImpl.java     | 10 +++
 .../spi/core/protocol/SessionCallback.java      |  5 ++
 .../integration/amqp/JMXManagementTest.java     | 92 ++++++++++++++++++++
 5 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 61816af..14c1042 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -56,6 +56,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -109,6 +110,8 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
 
+   private ProtonTransactionHandler transactionHandler;
+
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -690,6 +693,14 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   @Override
+   public Transaction getCurrentTransaction() {
+      if (this.transactionHandler != null) {
+         return this.transactionHandler.getCurrentTransaction();
+      }
+      return null;
+   }
+
    public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
       return protonSPI.getTransaction(txid, remove);
    }
@@ -740,6 +751,14 @@ public class AMQPSessionCallback implements SessionCallback {
       serverSession.removeProducer(name);
    }
 
+   public void setTransactionHandler(ProtonTransactionHandler transactionHandler) {
+      this.transactionHandler = transactionHandler;
+   }
+
+   public ProtonTransactionHandler getTransactionHandler() {
+      return this.transactionHandler;
+   }
+
 
    class AddressQueryCache<T> {
       SimpleString address;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 9ccc196..78a5b33 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 import java.nio.ByteBuffer;
 
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -47,6 +48,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
 
    private final int amqpCredit;
    private final int amqpLowMark;
+   private Transaction currentTx;
 
    final AMQPSessionCallback sessionSPI;
    final AMQPConnectionContext connection;
@@ -58,6 +60,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
       this.connection = connection;
       this.amqpCredit = connection.getAmqpCredits();
       this.amqpLowMark = connection.getAmqpLowCredits();
+      this.sessionSPI.setTransactionHandler(this);
    }
 
    @Override
@@ -100,6 +103,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             Binary txID = sessionSPI.newTransaction();
             Declared declared = new Declared();
             declared.setTxnId(txID);
+            currentTx = sessionSPI.getTransaction(txID, false);
             IOCallback ioAction = new IOCallback() {
                @Override
                public void done() {
@@ -115,7 +119,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
 
                @Override
                public void onError(int errorCode, String errorMessage) {
-
+                  currentTx = null;
                }
             };
             sessionSPI.afterIO(ioAction);
@@ -133,6 +137,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
                   try {
                      delivery.settle();
                      delivery.disposition(new Accepted());
+                     currentTx = null;
                   } finally {
                      connection.unlock();
                      connection.flush();
@@ -192,4 +197,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
       message.decode(encoded);
       return message;
    }
+
+   public Transaction getCurrentTransaction() {
+      return currentTx;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d13cd76..7ab353a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1884,6 +1884,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             return oper.getListOnConsumer(consumerId);
          }
       } else {
+         //amqp handles the transaction in callback
+         if (callback != null) {
+            Transaction transaction = callback.getCurrentTransaction();
+            if (transaction != null) {
+               RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+               if (operation != null) {
+                  return operation.getListOnConsumer(consumerId);
+               }
+            }
+         }
          return Collections.emptyList();
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index c4a2dbe..5577522 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 
 public interface SessionCallback {
@@ -93,4 +94,8 @@ public interface SessionCallback {
    default void close(boolean failed) {
 
    }
+
+   default Transaction getCurrentTransaction() {
+      return null;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
new file mode 100644
index 0000000..3be3e88
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Map;
+
+public class JMXManagementTest extends JMSClientTestSupport {
+
+   @Test
+   public void testListDeliveringMessages() throws Exception {
+      SimpleString queue = new SimpleString(getQueueName());
+
+      Connection connection1 = createConnection();
+      Connection connection2 = createConnection();
+      Session prodSession = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session consSession = connection2.createSession(true, Session.SESSION_TRANSACTED);
+
+      javax.jms.Queue jmsQueue = prodSession.createQueue(queue.toString());
+
+      QueueControl queueControl = createManagementControl(queue, queue);
+
+      MessageProducer producer = prodSession.createProducer(jmsQueue);
+      final int num = 20;
+
+      for (int i = 0; i < num; i++) {
+         TextMessage message = prodSession.createTextMessage("hello" + i);
+         producer.send(message);
+      }
+
+      connection2.start();
+      MessageConsumer consumer = consSession.createConsumer(jmsQueue);
+
+      for (int i = 0; i < num; i++) {
+         TextMessage msgRec = (TextMessage) consumer.receive(5000);
+         assertNotNull(msgRec);
+         assertEquals(msgRec.getText(), "hello" + i);
+      }
+
+      //before commit
+      assertEquals(num, queueControl.getDeliveringCount());
+
+      Map<String, Map<String, Object>[]> result = queueControl.listDeliveringMessages();
+      assertEquals(1, result.size());
+
+      Map<String, Object>[] msgMaps = result.entrySet().iterator().next().getValue();
+
+      assertEquals(num, msgMaps.length);
+
+      consSession.commit();
+      result = queueControl.listDeliveringMessages();
+
+      assertEquals(0, result.size());
+
+      consSession.close();
+      prodSession.close();
+
+      connection1.close();
+      connection2.close();
+   }
+
+   protected QueueControl createManagementControl(final SimpleString address,
+                                                  final SimpleString queue) throws Exception {
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer);
+
+      return queueControl;
+   }
+}

Reply | Threaded
Open this post in threaded view
|

[2/2] activemq-artemis git commit: This closes #2074

clebertsuconic-2
This closes #2074


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7c547054
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7c547054
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7c547054

Branch: refs/heads/master
Commit: 7c5470548a98405b2ca97ee4749e6258641daec1
Parents: c0a40a1 72eadb2
Author: Clebert Suconic <[hidden email]>
Authored: Tue Nov 6 22:00:26 2018 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Nov 6 22:00:26 2018 -0500

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 19 ++++
 .../transaction/ProtonTransactionHandler.java   | 11 ++-
 .../core/server/impl/ServerSessionImpl.java     | 10 +++
 .../spi/core/protocol/SessionCallback.java      |  5 ++
 .../integration/amqp/JMXManagementTest.java     | 92 ++++++++++++++++++++
 5 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------