Quantcast

[1/2] activemq-artemis git commit: ARTEMIS-60 Validate AMQP sender applied TransactionState

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] activemq-artemis git commit: ARTEMIS-60 Validate AMQP sender applied TransactionState

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 8c310a2ce -> 7374d2f72


ARTEMIS-60 Validate AMQP sender applied TransactionState

Update the AMQP test client to allow for better inspection of the
delivery updates that happen during normal use.  Use those modification
to check that when the broker's sender accepts and settles a non-settled
disposition it adds a proper TransactionState disposition with the
correct outcome and txn-id in that state.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a0948928
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a0948928
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a0948928

Branch: refs/heads/master
Commit: a0948928c3b34788e6f371fd49eb4d07273ba08b
Parents: 8c310a2
Author: Timothy Bish <[hidden email]>
Authored: Mon Mar 20 16:46:18 2017 -0400
Committer: Timothy Bish <[hidden email]>
Committed: Mon Mar 20 16:46:18 2017 -0400

----------------------------------------------------------------------
 .../amqp/client/AmqpAbstractResource.java       | 12 +++-
 .../transport/amqp/client/AmqpConnection.java   |  3 +-
 .../transport/amqp/client/AmqpEventSink.java    |  5 +-
 .../transport/amqp/client/AmqpReceiver.java     |  4 +-
 .../transport/amqp/client/AmqpSender.java       | 14 +---
 .../amqp/client/AmqpTransactionCoordinator.java | 11 ++--
 .../integration/amqp/AmqpTransactionTest.java   | 67 ++++++++++++++++++++
 7 files changed, 94 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index 0ab4596..691c11f 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -242,7 +242,8 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
+      doDeliveryUpdate(delivery);
    }
 
    @Override
@@ -305,7 +306,14 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
    }
 
    protected void doDeliveryUpdate(Delivery delivery) {
-
+      AmqpValidator validator = getStateInspector();
+      if (validator != null) {
+         try {
+            validator.inspectDeliveryUpdate(delivery);
+         } catch (Throwable error) {
+            validator.markAsInvalid(error.getMessage());
+         }
+      }
    }
 
    //----- Private implementation utility methods ---------------------------//

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index fa44c02..76717fd 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -43,6 +43,7 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Event.Type;
@@ -697,7 +698,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                   break;
                case DELIVERY:
                   amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
-                  amqpEventSink.processDeliveryUpdates(this);
+                  amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
                   break;
                default:
                   break;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
index 1c511a5..5581328 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java
@@ -18,6 +18,8 @@ package org.apache.activemq.transport.amqp.client;
 
 import java.io.IOException;
 
+import org.apache.qpid.proton.engine.Delivery;
+
 /**
  * Interface used by classes that want to process AMQP events sent from
  * the transport layer.
@@ -53,9 +55,10 @@ public interface AmqpEventSink {
     * for the given endpoint.
     *
     * @param connection the AmqpConnection instance for easier access to fire events.
+    * @param delivery the Delivery that was updated.
     * @throws IOException if an error occurs while processing the update.
     */
-   void processDeliveryUpdates(AmqpConnection connection) throws IOException;
+   void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException;
 
    /**
     * Called when the Proton Engine signals an Flow related event has been triggered

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index cd76501..414f933 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -794,7 +794,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
       Delivery incoming = null;
       do {
          incoming = getEndpoint().current();
@@ -823,7 +823,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
          }
       } while (incoming != null);
 
-      super.processDeliveryUpdates(connection);
+      super.processDeliveryUpdates(connection, delivery);
    }
 
    private void processDelivery(Delivery incoming) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 350a201..0a41ce6 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
-import javax.jms.InvalidDestinationException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
@@ -26,6 +25,8 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.jms.InvalidDestinationException;
+
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
@@ -419,7 +420,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException {
       List<Delivery> toRemove = new ArrayList<>();
 
       for (Delivery delivery : pending) {
@@ -485,13 +486,4 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    public String toString() {
       return getClass().getSimpleName() + "{ address = " + address + "}";
    }
-
-   @Override
-   protected void doDeliveryUpdate(Delivery delivery) {
-      try {
-         getStateInspector().inspectDeliveryUpdate(delivery);
-      } catch (Throwable error) {
-         getStateInspector().markAsInvalid(error.getMessage());
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
index 2e1a3ab..bc1030e 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.TransactionRolledBackException;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.util.HashMap;
@@ -27,6 +24,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.TransactionRolledBackException;
+
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -67,7 +68,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
    }
 
    @Override
-   public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
+   public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
       try {
          Iterator<Delivery> deliveries = pendingDeliveries.iterator();
          while (deliveries.hasNext()) {
@@ -112,7 +113,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
             deliveries.remove();
          }
 
-         super.processDeliveryUpdates(connection);
+         super.processDeliveryUpdates(connection, delivery);
       } catch (Exception e) {
          throw IOExceptionSupport.create(e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0948928/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index d49d499..3a9d498 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.engine.Delivery;
@@ -920,6 +921,72 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
          sendConnection.close();
          consumerConnection.close();
       }
+   }
+
+   @Test(timeout = 30000)
+   public void testUnsettledTXMessageGetTransactedDispostion() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+      message.setText("Test-Message");
+      sender.send(message);
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.setStateInspector(new AmqpValidator() {
 
+         @Override
+         public void inspectDeliveryUpdate(Delivery delivery) {
+            if (delivery.remotelySettled()) {
+               LOG.info("Receiver got delivery update for: {}", delivery);
+               if (!(delivery.getRemoteState() instanceof TransactionalState)) {
+                  markAsInvalid("Transactionally acquire work no tagged as being in a transaction.");
+               } else {
+                  TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+                  if (!(txState.getOutcome() instanceof Accepted)) {
+                     markAsInvalid("Transaction state lacks any outcome");
+                  } else if (txState.getTxnId() == null) {
+                     markAsInvalid("Transaction state lacks any TX Id");
+                  }
+               }
+
+               if (!(delivery.getLocalState() instanceof TransactionalState)) {
+                  markAsInvalid("Transactionally acquire work no tagged as being in a transaction.");
+               } else {
+                  TransactionalState txState = (TransactionalState) delivery.getLocalState();
+                  if (!(txState.getOutcome() instanceof Accepted)) {
+                     markAsInvalid("Transaction state lacks any outcome");
+                  } else if (txState.getTxnId() == null) {
+                     markAsInvalid("Transaction state lacks any TX Id");
+                  }
+               }
+
+               TransactionalState localTxState = (TransactionalState) delivery.getLocalState();
+               TransactionalState remoteTxState = (TransactionalState) delivery.getRemoteState();
+
+               if (!localTxState.getTxnId().equals(remoteTxState)) {
+                  markAsInvalid("Message not enrolled in expected transaction");
+               }
+            }
+         }
+      });
+
+      session.begin();
+
+      assertTrue(session.isInTransaction());
+
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.accept(false);
+
+      session.commit();
+
+      sender.getStateInspector().assertValid();
+
+      connection.close();
    }
 }

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] activemq-artemis git commit: This closes #1109

clebertsuconic-2
This closes #1109


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7374d2f7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7374d2f7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7374d2f7

Branch: refs/heads/master
Commit: 7374d2f728ea60cd790709cc563161b60371e691
Parents: 8c310a2 a094892
Author: Clebert Suconic <[hidden email]>
Authored: Mon Mar 20 18:04:15 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Mon Mar 20 18:04:15 2017 -0400

----------------------------------------------------------------------
 .../amqp/client/AmqpAbstractResource.java       | 12 +++-
 .../transport/amqp/client/AmqpConnection.java   |  3 +-
 .../transport/amqp/client/AmqpEventSink.java    |  5 +-
 .../transport/amqp/client/AmqpReceiver.java     |  4 +-
 .../transport/amqp/client/AmqpSender.java       | 14 +---
 .../amqp/client/AmqpTransactionCoordinator.java | 11 ++--
 .../integration/amqp/AmqpTransactionTest.java   | 67 ++++++++++++++++++++
 7 files changed, 94 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


Loading...