Quantcast

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6659

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6659

tabish
Repository: activemq
Updated Branches:
  refs/heads/master 1fd245054 -> 381a1ae20


https://issues.apache.org/jira/browse/AMQ-6659

Honor the sender settle mode from the client and ensure we always set
receiver mode to FIRST to reflect we don't support SECOND.  Adds tests
coverage and needed test client changes for this.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/381a1ae2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/381a1ae2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/381a1ae2

Branch: refs/heads/master
Commit: 381a1ae20611427eddb6a8743e4043c9917cfbbc
Parents: 1fd2450
Author: Timothy Bish <[hidden email]>
Authored: Fri Apr 21 10:51:35 2017 -0400
Committer: Timothy Bish <[hidden email]>
Committed: Fri Apr 21 10:51:35 2017 -0400

----------------------------------------------------------------------
 .../amqp/protocol/AmqpAbstractReceiver.java     |  7 ++
 .../transport/amqp/protocol/AmqpSender.java     | 13 ++-
 .../transport/amqp/client/AmqpReceiver.java     | 52 ++++++++++--
 .../transport/amqp/client/AmqpSender.java       | 46 ++++++++++-
 .../transport/amqp/client/AmqpSession.java      | 74 +++++++++++++++++
 .../amqp/interop/AmqpReceiverTest.java          | 75 +++++++++++++++++
 .../transport/amqp/interop/AmqpSenderTest.java  | 84 ++++++++++++++++++++
 7 files changed, 337 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/381a1ae2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
index 9ed465a..d8ed108 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp.protocol;
 
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.fusesource.hawtbuf.Buffer;
@@ -47,6 +48,12 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
     public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
         super(session, endpoint);
         this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
+
+        // We don't support second so enforce it as First and let remote decide what to do
+        this.endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        // Match what the sender mode is
+        this.endpoint.setSenderSettleMode(endpoint.getRemoteSenderSettleMode());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/381a1ae2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 23f8597..5ac95b2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -52,6 +52,7 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
@@ -77,8 +78,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
     private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
     private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
-    private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
-    private final LinkedList<Delivery> dispatchedInTx = new LinkedList<Delivery>();
+    private final LinkedList<MessageDispatch> outbound = new LinkedList<>();
+    private final LinkedList<Delivery> dispatchedInTx = new LinkedList<>();
 
     private final ConsumerInfo consumerInfo;
     private AbstractSubscription subscription;
@@ -106,8 +107,14 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
         super(session, endpoint);
 
+        // We don't support second so enforce it as First and let remote decide what to do
+        this.endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        // Match what the sender mode is
+        this.endpoint.setSenderSettleMode(endpoint.getRemoteSenderSettleMode());
+
         this.consumerInfo = consumerInfo;
-        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+        this.presettle = getEndpoint().getSenderSettleMode() == SenderSettleMode.SETTLED;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/381a1ae2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index fbe36ff..cfc355c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -65,12 +65,15 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
 
     private final AtomicBoolean closed = new AtomicBoolean();
-    private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<AmqpMessage>();
+    private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<>();
 
     private final AmqpSession session;
     private final String address;
     private final String receiverId;
+
     private final Source userSpecifiedSource;
+    private final SenderSettleMode userSpecifiedSenderSettlementMode;
+    private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
 
     private String subscriptionName;
     private String selector;
@@ -84,13 +87,31 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
      * Create a new receiver instance.
      *
      * @param session
-     *  The parent session that created the receiver.
+     *        The parent session that created the receiver.
      * @param address
      *        The address that this receiver should listen on.
      * @param receiverId
      *        The unique ID assigned to this receiver.
      */
     public AmqpReceiver(AmqpSession session, String address, String receiverId) {
+        this(session, address, receiverId, null, null);
+    }
+
+    /**
+     * Create a new receiver instance.
+     *
+     * @param session
+     *  The parent session that created the receiver.
+     * @param address
+     *        The address that this receiver should listen on.
+     * @param receiverId
+     *        The unique ID assigned to this receiver.
+     * @param senderMode
+     *        The {@link SenderSettleMode} to use on open.
+     * @param receiverMode
+     *        The {@link ReceiverSettleMode} to use on open.
+     */
+    public AmqpReceiver(AmqpSession session, String address, String receiverId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
 
         if (address != null && address.isEmpty()) {
             throw new IllegalArgumentException("Address cannot be empty.");
@@ -100,6 +121,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         this.session = session;
         this.address = address;
         this.receiverId = receiverId;
+        this.userSpecifiedSenderSettlementMode = senderMode;
+        this.userSpecifiedReceiverSettlementMode = receiverMode;
     }
 
     /**
@@ -122,6 +145,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         this.userSpecifiedSource = source;
         this.address = source.getAddress();
         this.receiverId = receiverId;
+        this.userSpecifiedSenderSettlementMode = null;
+        this.userSpecifiedReceiverSettlementMode = null;
     }
 
     /**
@@ -715,12 +740,25 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         Receiver receiver = session.getEndpoint().receiver(receiverName);
         receiver.setSource(source);
         receiver.setTarget(target);
-        if (isPresettle()) {
-            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+
+        if (userSpecifiedSenderSettlementMode != null) {
+            receiver.setSenderSettleMode(userSpecifiedSenderSettlementMode);
+            if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
+                setPresettle(true);
+            }
+        } else {
+            if (isPresettle()) {
+                receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
+            } else {
+                receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            }
+        }
+
+        if (userSpecifiedReceiverSettlementMode != null) {
+            receiver.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
         } else {
-            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
         }
-        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
         setEndpoint(receiver);
 
@@ -788,7 +826,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     protected void configureSource(Source source) {
-        Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
+        Map<Symbol, DescribedType> filters = new HashMap<>();
         Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
                                          Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/381a1ae2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 8d28688..a48b856 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -66,7 +66,10 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
     private final AmqpSession session;
     private final String address;
     private final String senderId;
+
     private final Target userSpecifiedTarget;
+    private final SenderSettleMode userSpecifiedSenderSettlementMode;
+    private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
 
     private boolean presettle;
     private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@@ -90,6 +93,24 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
      *        The unique ID assigned to this sender.
      */
     public AmqpSender(AmqpSession session, String address, String senderId) {
+        this(session, address, senderId, null, null);
+    }
+
+    /**
+     * Create a new sender instance.
+     *
+     * @param session
+     *        The parent session that created the session.
+     * @param address
+     *        The address that this sender produces to.
+     * @param senderId
+     *        The unique ID assigned to this sender.
+     * @param senderMode
+     *        The {@link SenderSettleMode} to use on open.
+     * @param receiverMode
+     *        The {@link ReceiverSettleMode} to use on open.
+     */
+    public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
 
         if (address != null && address.isEmpty()) {
             throw new IllegalArgumentException("Address cannot be empty.");
@@ -99,6 +120,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
         this.address = address;
         this.senderId = senderId;
         this.userSpecifiedTarget = null;
+        this.userSpecifiedSenderSettlementMode = senderMode;
+        this.userSpecifiedReceiverSettlementMode = receiverMode;
     }
 
     /**
@@ -121,6 +144,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
         this.userSpecifiedTarget = target;
         this.address = target.getAddress();
         this.senderId = senderId;
+        this.userSpecifiedSenderSettlementMode = null;
+        this.userSpecifiedReceiverSettlementMode = null;
     }
 
     /**
@@ -302,12 +327,25 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
         Sender sender = session.getEndpoint().sender(senderName);
         sender.setSource(source);
         sender.setTarget(target);
-        if (presettle) {
-            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+
+        if (userSpecifiedSenderSettlementMode != null) {
+            sender.setSenderSettleMode(userSpecifiedSenderSettlementMode);
+            if (SenderSettleMode.SETTLED.equals(userSpecifiedSenderSettlementMode)) {
+                presettle = true;
+            }
+        } else {
+            if (presettle) {
+                sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+            } else {
+                sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            }
+        }
+
+        if (userSpecifiedReceiverSettlementMode != null) {
+            sender.setReceiverSettleMode(userSpecifiedReceiverSettlementMode);
         } else {
-            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
         }
-        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
         sender.setDesiredCapabilities(desiredCapabilities);
         sender.setOfferedCapabilities(offeredCapabilities);

http://git-wip-us.apache.org/repos/asf/activemq/blob/381a1ae2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 88cba94..b8d38e2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -28,6 +28,8 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Session;
 
@@ -142,6 +144,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
      * Create a sender instance using the given address
      *
      * @param address
+     *        the address to which the sender will produce its messages.
+     * @param senderSettlementMode
+     *        controls the settlement mode used by the created Sender
+     * @param receiverSettlementMode
+     *        controls the desired settlement mode used by the remote Receiver
+     *
+     * @return a newly created sender that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the sender.
+     */
+    public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
+        checkClosed();
+
+        final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode);
+        final ClientFuture request = new ClientFuture();
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                sender.setStateInspector(getStateInspector());
+                sender.open(request);
+                pumpToProtonTransport(request);
+            }
+        });
+
+        request.sync();
+
+        return sender;
+    }
+
+    /**
+     * Create a sender instance using the given address
+     *
+     * @param address
      *      the address to which the sender will produce its messages.
      * @param presettle
      *        controls if the created sender produces message that have already been marked settled.
@@ -350,6 +388,42 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     }
 
     /**
+     * Create a receiver instance using the given address
+     *
+     * @param address
+     *        the address to which the receiver will subscribe for its messages.
+     * @param senderSettlementMode
+     *        controls the desired settlement mode used by the remote Sender
+     * @param receiverSettlementMode
+     *        controls the settlement mode used by the created Receiver
+     *
+     * @return a newly created receiver that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver createReceiver(String address, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
+        checkClosed();
+
+        final ClientFuture request = new ClientFuture();
+        final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId(), senderMode, receiverMode);
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                receiver.setStateInspector(getStateInspector());
+                receiver.open(request);
+                pumpToProtonTransport(request);
+            }
+        });
+
+        request.sync();
+
+        return receiver;
+    }
+
+    /**
      * Create a receiver instance using the given Source
      *
      * @param source

http://git-wip-us.apache.org/repos/asf/activemq/blob/381a1ae2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index f60af7b..7bdafb7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -47,6 +47,8 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.message.Message;
@@ -125,6 +127,79 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testSenderSettlementModeSettledIsHonored() throws Exception {
+        doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
+    }
+
+    @Test(timeout = 60000)
+    public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
+        doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
+    }
+
+    @Test(timeout = 60000)
+    public void testSenderSettlementModeMixedIsHonored() throws Exception {
+        doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
+    }
+
+    public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+
+        assertEquals(settleMode, receiver.getEndpoint().getRemoteSenderSettleMode());
+
+        receiver.close();
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiverSettlementModeSetToFirst() throws Exception {
+        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiverSettlementModeSetToSecond() throws Exception {
+        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
+    }
+
+    /*
+     * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that
+     * it always drops that back to FIRST to let the client know.  The client will need to
+     * check and react accordingly.
+     */
+    private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpReceiver receiver = session.createReceiver(
+            "queue://" + getTestName(), SenderSettleMode.MIXED, modeToUse);
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length);
+
+        assertEquals(ReceiverSettleMode.FIRST, receiver.getEndpoint().getRemoteReceiverSettleMode());
+
+        receiver.close();
+        assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
     public void testCreateQueueReceiverWithJMSSelector() throws Exception {
         AmqpClient client = createAmqpClient();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/381a1ae2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
index da9e011..79ff275 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
@@ -37,6 +37,8 @@ import org.apache.activemq.transport.amqp.client.AmqpSupport;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.activemq.util.Wait;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
 import org.junit.Test;
@@ -49,6 +51,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
     @Test(timeout = 60000)
     public void testCreateQueueSender() throws Exception {
         AmqpClient client = createAmqpClient();
+
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
 
@@ -85,6 +88,87 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testSenderSettlementModeSettledIsHonored() throws Exception {
+        doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
+    }
+
+    @Test(timeout = 60000)
+    public void testSenderSettlementModeUnsettledIsHonored() throws Exception {
+        doTestSenderSettlementModeIsHonored(SenderSettleMode.UNSETTLED);
+    }
+
+    @Test(timeout = 60000)
+    public void testSenderSettlementModeMixedIsHonored() throws Exception {
+        doTestSenderSettlementModeIsHonored(SenderSettleMode.MIXED);
+    }
+
+    public void doTestSenderSettlementModeIsHonored(SenderSettleMode settleMode) throws Exception {
+        AmqpClient client = createAmqpClient();
+
+        client.setTraceFrames(true);
+
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender(
+            "queue://" + getTestName(), settleMode, ReceiverSettleMode.FIRST);
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        assertEquals(settleMode, sender.getEndpoint().getRemoteSenderSettleMode());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        sender.close();
+        assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiverSettlementModeSetToFirst() throws Exception {
+        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.FIRST);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiverSettlementModeSetToSecond() throws Exception {
+        doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode.SECOND);
+    }
+
+    /*
+     * The Broker does not currently support ReceiverSettleMode of SECOND so we ensure that
+     * it always drops that back to FIRST to let the client know.  The client will need to
+     * check and react accordingly.
+     */
+    private void doTestReceiverSettlementModeForcedToFirst(ReceiverSettleMode modeToUse) throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = trackConnection(client.connect());
+        AmqpSession session = connection.createSession();
+
+        assertEquals(0, brokerService.getAdminView().getQueues().length);
+
+        AmqpSender sender = session.createSender(
+            "queue://" + getTestName(), SenderSettleMode.UNSETTLED, modeToUse);
+
+        assertEquals(1, brokerService.getAdminView().getQueues().length);
+        assertNotNull(getProxyToQueue(getTestName()));
+        assertEquals(1, brokerService.getAdminView().getQueueProducers().length);
+
+        assertEquals(ReceiverSettleMode.FIRST, sender.getEndpoint().getRemoteReceiverSettleMode());
+
+        sender.close();
+        assertEquals(0, brokerService.getAdminView().getQueueProducers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
     public void testSendMessageToQueue() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());

Loading...