[1/2] activemq-artemis git commit: ARTEMIS-59 Accept transacted message using AMQP TransactionState

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] activemq-artemis git commit: ARTEMIS-59 Accept transacted message using AMQP TransactionState

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master e29c46373 -> 8c310a2ce


ARTEMIS-59 Accept transacted message using AMQP TransactionState

When a message is sent to the broker with a TransactionState indicating
that the message should be included in a transaction the disposition from
the broker indicating acceptance of the message should be done using a
TransactionState value that contained the TX ID and the Accepted
disposition.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/29796151
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/29796151
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/29796151

Branch: refs/heads/master
Commit: 29796151c3cdacc8bcb70c4cd213aca5864400b9
Parents: e29c463
Author: Timothy Bish <[hidden email]>
Authored: Mon Mar 20 12:33:22 2017 -0400
Committer: Timothy Bish <[hidden email]>
Committed: Mon Mar 20 12:42:56 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 11 +++-
 .../integration/amqp/AmqpTransactionTest.java   | 56 ++++++++++++++++++--
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/29796151/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 3592dbc..034cb72 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -60,6 +60,7 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
@@ -406,7 +407,15 @@ public class AMQPSessionCallback implements SessionCallback {
             @Override
             public void done() {
                synchronized (connection.getLock()) {
-                  delivery.disposition(Accepted.getInstance());
+                  if (delivery.getRemoteState() instanceof TransactionalState) {
+                     TransactionalState txAccepted = new TransactionalState();
+                     txAccepted.setOutcome(Accepted.getInstance());
+                     txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
+
+                     delivery.disposition(txAccepted);
+                  } else {
+                     delivery.disposition(Accepted.getInstance());
+                  }
                   delivery.settle();
                }
                connection.flush();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/29796151/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index 41bc5e7..d49d499 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
@@ -24,10 +29,6 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -38,15 +39,23 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test various aspects of Transaction support.
  */
 public class AmqpTransactionTest extends AmqpClientTestSupport {
 
+   private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionTest.class);
+
    @Test(timeout = 30000)
    public void testBeginAndCommitTransaction() throws Exception {
       AmqpClient client = createAmqpClient();
@@ -78,6 +87,45 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 30000)
+   public void testSentTransactionalMessageIsSettleWithTransactionalDisposition() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+
+      AmqpSender sender = session.createSender(getTestName());
+      sender.setStateInspector(new AmqpValidator() {
+
+         @Override
+         public void inspectDeliveryUpdate(Delivery delivery) {
+            if (delivery.remotelySettled()) {
+               DeliveryState state = delivery.getRemoteState();
+               if (state instanceof TransactionalState) {
+                  LOG.debug("Remote settled with TX state: {}", state);
+               } else {
+                  LOG.warn("Remote settled with non-TX state: {}", state);
+                  markAsInvalid("Remote did not settled with TransactionState.");
+               }
+            }
+         }
+      });
+
+      session.begin();
+
+      assertTrue(session.isInTransaction());
+
+      AmqpMessage message = new AmqpMessage();
+      message.setText("Test-Message");
+      sender.send(message);
+
+      session.commit();
+
+      sender.getStateInspector().assertValid();
+
+      connection.close();
+   }
+
+   @Test(timeout = 30000)
    public void testBeginAndRollbackTransaction() throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] activemq-artemis git commit: This closes #1107

clebertsuconic-2
This closes #1107


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8c310a2c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8c310a2c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8c310a2c

Branch: refs/heads/master
Commit: 8c310a2ce81ead4c9a54adc43edbbda715e740dc
Parents: e29c463 2979615
Author: Clebert Suconic <[hidden email]>
Authored: Mon Mar 20 15:20:46 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Mon Mar 20 15:20:46 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 11 +++-
 .../integration/amqp/AmqpTransactionTest.java   | 56 ++++++++++++++++++--
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


Loading...