activemq-artemis git commit: ARTEMIS-2100 address routing-type overridden on attaching AMQP sender

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

activemq-artemis git commit: ARTEMIS-2100 address routing-type overridden on attaching AMQP sender

martyntaylor
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x ade0003c3 -> 5b7f2d5c2


ARTEMIS-2100 address routing-type overridden on attaching AMQP sender

AMQPSender has to honor an already existing multicast routingType
of an address while attempting to create a fresh new queue on it

(cherry picked from commit b71c1448914cbd25d53ba11720966bf8c2323c07)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5b7f2d5c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5b7f2d5c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5b7f2d5c

Branch: refs/heads/2.6.x
Commit: 5b7f2d5c2af97c719d0529513d8db96351176863
Parents: ade0003
Author: Francesco Nigro <[hidden email]>
Authored: Wed Oct 24 15:45:53 2018 +0200
Committer: Martyn Taylor <[hidden email]>
Committed: Mon Nov 5 10:52:13 2018 +0000

----------------------------------------------------------------------
 .../proton/ProtonServerReceiverContext.java     |  13 ++-
 .../amqp/AmqpSenderRoutingTypeTest.java         | 116 +++++++++++++++++++
 2 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5b7f2d5c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 0758714..b0cfba0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -19,11 +19,13 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -229,8 +231,15 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             }
          }
       }
-
-      return sessionSPI.getDefaultRoutingType(address);
+      final AddressInfo addressInfo = sessionSPI.getAddress(address);
+      if (addressInfo != null && !addressInfo.getRoutingTypes().isEmpty()) {
+         if (addressInfo.getRoutingTypes().size() == 1 && addressInfo.getRoutingType() == RoutingType.MULTICAST) {
+            return RoutingType.MULTICAST;
+         }
+      }
+      RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address);
+      defaultRoutingType = defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
+      return defaultRoutingType;
    }
 
    /*

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5b7f2d5c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java
new file mode 100644
index 0000000..0d46798
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderRoutingTypeTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AmqpSenderRoutingTypeTest extends JMSClientTestSupport {
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.setJournalType(JournalType.NIO);
+      Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
+      if (map.size() == 0) {
+         AddressSettings as = new AddressSettings();
+         as.setDefaultAddressRoutingType(RoutingType.ANYCAST);
+         map.put("#", as);
+      }
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE";
+   }
+
+   @Test
+   public void testAMQPSenderHonourRoutingTypeOfExistingAddress() throws Exception {
+      RoutingType routingType = server.getConfiguration().getAddressesSettings().get("#").getDefaultAddressRoutingType();
+      Assert.assertEquals(RoutingType.ANYCAST, routingType);
+      try (ActiveMQConnection coreConnection = (ActiveMQConnection) createCoreConnection();
+           ClientSession clientSession = coreConnection.getSessionFactory().createSession()) {
+         RoutingType addressRoutingType = RoutingType.MULTICAST;
+         SimpleString address = SimpleString.toSimpleString("myTopic_" + UUID.randomUUID().toString());
+         clientSession.createAddress(address, addressRoutingType, false);
+         ClientSession.AddressQuery addressQuery = clientSession.addressQuery(address);
+         Assert.assertTrue(addressQuery.isExists());
+         Assert.assertTrue(addressQuery.getQueueNames().isEmpty());
+         AmqpClient client = createAmqpClient(guestUser, guestPass);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(address.toString());
+         try {
+            ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
+            Assert.assertFalse(queueQuery.isExists());
+            Assert.assertEquals(addressRoutingType, queueQuery.getRoutingType());
+         } finally {
+            sender.close();
+            session.close();
+            connection.close();
+         }
+      }
+
+   }
+
+   @Test
+   public void testAMQPSenderCreateQueueWithDefaultRoutingTypeIfAddressDoNotExist() throws Exception {
+      RoutingType defaultRoutingType = server.getConfiguration().getAddressesSettings().get("#").getDefaultAddressRoutingType();
+      Assert.assertEquals(RoutingType.ANYCAST, defaultRoutingType);
+      try (ActiveMQConnection coreConnection = (ActiveMQConnection) createCoreConnection();
+           ClientSession clientSession = coreConnection.getSessionFactory().createSession()) {
+         SimpleString address = SimpleString.toSimpleString("myTopic_" + UUID.randomUUID().toString());
+         ClientSession.AddressQuery addressQuery = clientSession.addressQuery(address);
+         Assert.assertFalse(addressQuery.isExists());
+         Assert.assertTrue(addressQuery.getQueueNames().isEmpty());
+         AmqpClient client = createAmqpClient(guestUser, guestPass);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(address.toString());
+         try {
+            addressQuery = clientSession.addressQuery(address);
+            Assert.assertTrue(addressQuery.isExists());
+            Assert.assertThat(addressQuery.getQueueNames(), CoreMatchers.hasItem(address));
+            ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
+            Assert.assertTrue(queueQuery.isExists());
+            Assert.assertEquals(defaultRoutingType, queueQuery.getRoutingType());
+         } finally {
+            sender.close();
+            session.close();
+            connection.close();
+         }
+      }
+
+   }
+}