[1/5] activemq-artemis git commit: ARTEMIS-1416 Queue is not autocreated if address already exists

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/5] activemq-artemis git commit: ARTEMIS-1416 Queue is not autocreated if address already exists

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master b0c83073e -> 07eb1d25b


ARTEMIS-1416 Queue is not autocreated if address already exists

- Fix on core and amqp.
- Add test to verify amqp's current large message behavior.
- Add test to openwire also just to verify.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f3ace6af
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f3ace6af
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f3ace6af

Branch: refs/heads/master
Commit: f3ace6afd726dc8e3c1c58c76e3fad3d5cfa357d
Parents: b0c8307
Author: Howard Gao <[hidden email]>
Authored: Mon Oct 30 12:47:23 2017 +0800
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Nov 3 18:25:23 2017 -0400

----------------------------------------------------------------------
 .../jms/client/ActiveMQMessageProducer.java     |  11 +-
 .../amqp/broker/AMQPSessionCallback.java        |  23 +--
 .../integration/amqp/QueueAutoCreationTest.java | 161 +++++++++++++++++++
 .../LargeMessageQueueAutoCreationTest.java      | 158 ++++++++++++++++++
 4 files changed, 343 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 3121a88..9f86e49 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -420,7 +420,16 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
                      throw new InvalidDestinationException("Destination " + address + " does not exist");
                   }
                } else {
-                  connection.addKnownDestination(address);
+                  ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
+                  if (queueQuery.isExists()) {
+                     connection.addKnownDestination(address);
+                  } else if (destination.isQueue() && query.isAutoCreateQueues()) {
+                     if (destination.isTemporary()) {
+                        clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
+                     } else {
+                        clientSession.createQueue(address, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers());
+                     }
+                  }
                }
             } catch (ActiveMQQueueExistsException e) {
                // The queue was created by another client/admin between the query check and send create queue packet

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 19f6351..7a7a41e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -284,11 +284,14 @@ public class AMQPSessionCallback implements SessionCallback {
             // The address may have been created by another thread in the mean time.  Catch and do nothing.
          }
          bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
-      } else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
-         try {
-            serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
-         } catch (ActiveMQQueueExistsException e) {
-            // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+      } else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
+         QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress);
+         if (!queueBinding.isExists()) {
+            try {
+               serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true);
+            } catch (ActiveMQQueueExistsException e) {
+               // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+            }
          }
          bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
       }
@@ -394,14 +397,16 @@ public class AMQPSessionCallback implements SessionCallback {
          message.setAddress(new SimpleString(address));
       } else {
          // Anonymous relay must set a To value
-         if (message.getAddress() == null) {
+         address = message.getAddress();
+         if (address == null) {
             rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
             return;
          }
+      }
 
-         if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) {
-            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
-         }
+      //here check queue-autocreation
+      if (!bindingQuery(address, RoutingType.ANYCAST)) {
+         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
       }
 
       OperationContext oldcontext = recoverContext();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java
new file mode 100644
index 0000000..f6c8b22
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
+public class QueueAutoCreationTest extends JMSClientTestSupport {
+
+   Queue queue1;
+   Random random = new Random();
+   ActiveMQConnection testConn;
+   ClientSession clientSession;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      String randomSuffix = new BigInteger(130, random).toString(32);
+      testConn = (ActiveMQConnection)createCoreConnection();
+      clientSession = testConn.getSessionFactory().createSession();
+      queue1 = createQueue("queue1_" + randomSuffix);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      testConn.close();
+      super.tearDown();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE";
+   }
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.setJournalType(JournalType.NIO);
+      Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
+      if (map.size() == 0) {
+         AddressSettings as = new AddressSettings();
+         map.put("#", as);
+      }
+      Map.Entry<String, AddressSettings> entry = map.entrySet().iterator().next();
+      AddressSettings settings = entry.getValue();
+      settings.setAutoCreateQueues(true);
+      System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues());
+   }
+
+
+   protected Queue createQueue(final String queueName) throws Exception {
+      SimpleString address = SimpleString.toSimpleString(queueName);
+      clientSession.createAddress(address, RoutingType.ANYCAST, false);
+      return new ActiveMQQueue(queueName);
+   }
+
+   @Test(timeout = 30000)
+   public void testSmallString() throws Exception {
+      sendStringOfSize(1024, false);
+   }
+
+   @Test(timeout = 30000)
+   public void testHugeString() throws Exception {
+      //amqp doesn't support large message receive.
+      //using core to receive, it can verify
+      //that the large message is indeed stored in core
+      //via amqp send.
+      sendStringOfSize(1024 * 1024, true);
+   }
+
+   private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws JMSException {
+
+      Connection conn = this.createConnection();
+
+      try {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = session.createProducer(queue1);
+
+         TextMessage m = session.createTextMessage();
+
+         m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < msgSize) {
+            buffer.append(UUIDGenerator.getInstance().generateStringUUID());
+         }
+
+         final String originalString = buffer.toString();
+
+         m.setText(originalString);
+
+         prod.send(m);
+
+         conn.close();
+
+         if (useCoreReceive) {
+            conn = createCoreConnection();
+         } else {
+            conn = createConnection();
+         }
+
+         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = session.createConsumer(queue1);
+
+         conn.start();
+
+         TextMessage rm = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(rm);
+
+         String str = rm.getText();
+         Assert.assertEquals(originalString, str);
+      } finally {
+         if (conn != null) {
+            conn.close();
+         }
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java
new file mode 100644
index 0000000..b3f224d
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Random;
+
+//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
+@RunWith(Parameterized.class)
+public class LargeMessageQueueAutoCreationTest extends BasicOpenWireTest {
+
+   Queue queue1;
+   Random random = new Random();
+   ActiveMQConnection testConn;
+   ClientSession clientSession;
+
+   @Parameterized.Parameter
+   public boolean usingCore;
+
+   @Parameterized.Parameters(name = "isCore={0}")
+   public static Collection<Object[]> params() {
+      return Arrays.asList(new Object[][]{{true}, {false}});
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      String randomSuffix = new BigInteger(130, random).toString(32);
+      testConn = (ActiveMQConnection)coreCf.createConnection();
+      clientSession = testConn.getSessionFactory().createSession();
+      queue1 = createCoreQueue("queue1_" + randomSuffix);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      testConn.close();
+      super.tearDown();
+   }
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      serverConfig.setJournalType(JournalType.NIO);
+      Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
+      Map.Entry<String, AddressSettings> entry = map.entrySet().iterator().next();
+      AddressSettings settings = entry.getValue();
+      settings.setAutoCreateQueues(true);
+      System.out.println("server cofg, isauto? " + entry.getValue().isAutoCreateQueues());
+   }
+
+
+   protected Queue createCoreQueue(final String queueName) throws Exception {
+      SimpleString address = SimpleString.toSimpleString(queueName);
+      clientSession.createAddress(address, RoutingType.ANYCAST, false);
+      return new ActiveMQQueue(queueName);
+   }
+
+   @Test(timeout = 30000)
+   public void testSmallString() throws Exception {
+      sendStringOfSize(1024);
+   }
+
+   @Test(timeout = 30000)
+   public void testHugeString() throws Exception {
+      sendStringOfSize(1024 * 1024);
+   }
+
+   private void sendStringOfSize(int msgSize) throws JMSException {
+
+      ConnectionFactory factoryToUse = usingCore ? coreCf : factory;
+
+      Connection conn = factoryToUse.createConnection();
+
+      try {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = session.createProducer(queue1);
+
+         TextMessage m = session.createTextMessage();
+
+         m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < msgSize) {
+            buffer.append(UUIDGenerator.getInstance().generateStringUUID());
+         }
+
+         final String originalString = buffer.toString();
+
+         m.setText(originalString);
+
+         prod.send(m);
+
+         conn.close();
+
+         conn = factoryToUse.createConnection();
+
+         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = session.createConsumer(queue1);
+
+         conn.start();
+
+         TextMessage rm = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(rm);
+
+         String str = rm.getText();
+         Assert.assertEquals(originalString, str);
+      } finally {
+         if (conn != null) {
+            conn.close();
+         }
+      }
+   }
+}
\ No newline at end of file

Reply | Threaded
Open this post in threaded view
|

[2/5] activemq-artemis git commit: ARTEMIS-1416 Fix regressions in Joram tests

clebertsuconic-2
ARTEMIS-1416 Fix regressions in Joram tests

This closes #1621


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec13ed6d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec13ed6d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec13ed6d

Branch: refs/heads/master
Commit: ec13ed6df0fb2c92ab9a9c9e6af9259a07be0fdb
Parents: f3ace6a
Author: Howard Gao <[hidden email]>
Authored: Wed Nov 1 23:34:05 2017 +0800
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Nov 3 18:26:03 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPSessionCallback.java |  9 +++++++--
 .../amqp/proton/ProtonServerReceiverContext.java  | 18 +++++++++++++++---
 .../jtests/jms/framework/PubSubTestCase.java      |  3 +--
 3 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec13ed6d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 7a7a41e..53b9b4f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
@@ -386,7 +387,8 @@ public class AMQPSessionCallback implements SessionCallback {
       ((ServerConsumer) consumer).receiveCredits(-1);
    }
 
-   public void serverSend(final Transaction transaction,
+   public void serverSend(final ProtonServerReceiverContext context,
+                          final Transaction transaction,
                           final Receiver receiver,
                           final Delivery delivery,
                           String address,
@@ -405,7 +407,10 @@ public class AMQPSessionCallback implements SessionCallback {
       }
 
       //here check queue-autocreation
-      if (!bindingQuery(address, RoutingType.ANYCAST)) {
+      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+
+      RoutingType routingType = context.getRoutingType(receiver);
+      if (!bindingQuery(address, routingType)) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec13ed6d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index bcab2ea..014b9f9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -58,6 +58,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    protected final AMQPSessionCallback sessionSPI;
 
+   private RoutingType defRoutingType;
+
    /*
     The maximum number of credits we will allocate to clients.
     This number is also used by the broker when refresh client credits.
@@ -98,12 +100,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
       if (target != null) {
          if (target.getDynamic()) {
+            defRoutingType = getRoutingType(target.getCapabilities());
             // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
             // will be deleted on closing of the session
             address = sessionSPI.tempQueueName();
 
             try {
-               sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
+               sessionSPI.createTemporaryQueue(address, defRoutingType);
             } catch (ActiveMQSecurityException e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(e.getMessage());
             } catch (Exception e) {
@@ -118,8 +121,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             address = target.getAddress();
 
             if (address != null && !address.isEmpty()) {
+               defRoutingType = getRoutingType(target.getCapabilities());
                try {
-                  if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities()))) {
+                  if (!sessionSPI.bindingQuery(address, defRoutingType)) {
                      throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
                   }
                } catch (ActiveMQAMQPNotFoundException e) {
@@ -177,6 +181,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       flow(amqpCredits, minCreditRefresh);
    }
 
+   public RoutingType getRoutingType(Receiver receiver) {
+      if (receiver == this.receiver) {
+         return defRoutingType;
+      }
+      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+      return target != null ? getRoutingType(target.getCapabilities()) : getRoutingType((Symbol[])null);
+   }
+
    private RoutingType getRoutingType(Symbol[] symbols) {
       if (symbols != null) {
          for (Symbol symbol : symbols) {
@@ -223,7 +235,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
          }
 
-         sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
+         sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
 
          flow(amqpCredits, minCreditRefresh);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec13ed6d/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java
----------------------------------------------------------------------
diff --git a/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java b/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java
index d5792c0..d340024 100644
--- a/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java
+++ b/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java
@@ -161,8 +161,7 @@ public abstract class PubSubTestCase extends JMSTestCase {
          subscriberTCF = null;
          subscriberSession = null;
          subscriberConnection = null;
+         super.tearDown();
       }
-
-      super.tearDown();
    }
 }

Reply | Threaded
Open this post in threaded view
|

[3/5] activemq-artemis git commit: ARTEMIS-1416 Fixing qpid AMQP tests

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1416 Fixing qpid AMQP tests

This will fix tests from https://git-wip-us.apache.org/repos/asf/qpid-interop-test.git

Notice that the previous 2 committs here are needed


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e9b39c8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e9b39c8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e9b39c8

Branch: refs/heads/master
Commit: 0e9b39c82522cb487b7e22d9e650ddb8d4091731
Parents: ec13ed6
Author: Clebert Suconic <[hidden email]>
Authored: Thu Nov 2 15:55:04 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Nov 3 18:27:24 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  8 ++++----
 .../proton/ProtonServerReceiverContext.java     | 21 ++++++++++++--------
 2 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e9b39c8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 53b9b4f..667d57a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -269,9 +269,11 @@ public class AMQPSessionCallback implements SessionCallback {
          queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
       }
 
-      if (queueQueryResult.getRoutingType() != routingType) {
+      // if auto-create we will return whatever type was used before
+      if (!queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) {
          throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
       }
+
       return queueQueryResult;
    }
 
@@ -407,9 +409,7 @@ public class AMQPSessionCallback implements SessionCallback {
       }
 
       //here check queue-autocreation
-      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
-
-      RoutingType routingType = context.getRoutingType(receiver);
+      RoutingType routingType = context.getRoutingType(receiver, RoutingType.ANYCAST);
       if (!bindingQuery(address, routingType)) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e9b39c8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 014b9f9..15318d5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -58,8 +58,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    protected final AMQPSessionCallback sessionSPI;
 
-   private RoutingType defRoutingType;
-
    /*
     The maximum number of credits we will allocate to clients.
     This number is also used by the broker when refresh client credits.
@@ -98,6 +96,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       // We don't currently support SECOND so enforce that the answer is anlways FIRST
       receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
+      RoutingType defRoutingType;
+
       if (target != null) {
          if (target.getDynamic()) {
             defRoutingType = getRoutingType(target.getCapabilities());
@@ -181,15 +181,16 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       flow(amqpCredits, minCreditRefresh);
    }
 
-   public RoutingType getRoutingType(Receiver receiver) {
-      if (receiver == this.receiver) {
-         return defRoutingType;
-      }
+   public RoutingType getRoutingType(Receiver receiver, RoutingType defaultType) {
       org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
-      return target != null ? getRoutingType(target.getCapabilities()) : getRoutingType((Symbol[])null);
+      return target != null ? getRoutingType(target.getCapabilities(), defaultType) : getRoutingType((Symbol[])null, defaultType);
    }
 
    private RoutingType getRoutingType(Symbol[] symbols) {
+      return getRoutingType(symbols, null);
+   }
+
+   private RoutingType getRoutingType(Symbol[] symbols, RoutingType defaultType) {
       if (symbols != null) {
          for (Symbol symbol : symbols) {
             if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@@ -200,7 +201,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          }
       }
 
-      return sessionSPI.getDefaultRoutingType(address);
+      if (defaultType != null) {
+         return defaultType;
+      } else {
+         return sessionSPI.getDefaultRoutingType(address);
+      }
    }
 
    /*

Reply | Threaded
Open this post in threaded view
|

[4/5] activemq-artemis git commit: ARTEMIS-1416 Fixing testsuite

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1416 Fixing testsuite


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/559a7048
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/559a7048
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/559a7048

Branch: refs/heads/master
Commit: 559a704818bf91af0a3ea92cfccead3e62c30238
Parents: 0e9b39c
Author: Clebert Suconic <[hidden email]>
Authored: Fri Nov 3 18:25:07 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Nov 3 18:27:42 2017 -0400

----------------------------------------------------------------------
 .../core/protocol/core/ServerSessionPacketHandler.java    |  2 +-
 .../activemq/artemis/core/server/BindingQueryResult.java  | 10 ++++++++++
 .../artemis/core/server/impl/ActiveMQServerImpl.java      |  6 ++++--
 .../amqp/BrokerDefinedAnycastConsumerTest.java            |  7 +++++++
 4 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/559a7048/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 8f3c5be..75ef071 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -446,7 +446,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                      if (!queueNames.isEmpty()) {
                         final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
                         if (convertedQueueNames != queueNames) {
-                           result = new BindingQueryResult(result.isExists(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
+                           result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
                         }
                      }
                   }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/559a7048/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
index 7eed31f..7f340b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server;
 import java.util.List;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 
 public class BindingQueryResult {
 
@@ -34,12 +35,17 @@ public class BindingQueryResult {
 
    private int defaultMaxConsumers;
 
+   private final AddressInfo addressInfo;
+
    public BindingQueryResult(final boolean exists,
+                             final AddressInfo addressInfo,
                              final List<SimpleString> queueNames,
                              final boolean autoCreateQueues,
                              final boolean autoCreateAddresses,
                              final boolean defaultPurgeOnNoConsumers,
                              final int defaultMaxConsumers) {
+      this.addressInfo = addressInfo;
+
       this.exists = exists;
 
       this.queueNames = queueNames;
@@ -57,6 +63,10 @@ public class BindingQueryResult {
       return exists;
    }
 
+   public AddressInfo getAddressInfo() {
+      return addressInfo;
+   }
+
    public boolean isAutoCreateQueues() {
       return autoCreateQueues;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/559a7048/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 587cff9..8077ae9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -852,7 +852,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       SimpleString bindAddress = new SimpleString(realAddress);
       if (managementService != null) {
          if (bindAddress.equals(managementService.getManagementAddress())) {
-            return new BindingQueryResult(true, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
+            return new BindingQueryResult(true, null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
          }
       }
 
@@ -868,7 +868,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
-      return new BindingQueryResult(getAddressInfo(bindAddress) != null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
+      AddressInfo info = getAddressInfo(bindAddress);
+
+      return new BindingQueryResult(info != null, info, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/559a7048/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
index e10c73d..7f0fdd8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java
@@ -40,6 +40,11 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
    SimpleString queue1 = new SimpleString("queue1");
    SimpleString queue2 = new SimpleString("queue2");
 
+   @Override
+   protected boolean isAutoCreateQueues() {
+      return false;
+   }
+
    @Test(timeout = 60000)
    public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
       server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
@@ -187,6 +192,8 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport  {
 
    @Test(timeout = 60000)
    public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception {
+      // This test needs auto-create.. for that just clear the settings and use defaults
+      server.getAddressSettingsRepository().clear();
       AddressSettings settings = new AddressSettings();
       settings.setAutoCreateAddresses(true);
       server.getAddressSettingsRepository().addMatch(address.toString(), settings);

Reply | Threaded
Open this post in threaded view
|

[5/5] activemq-artemis git commit: This closes #1639

clebertsuconic-2
In reply to this post by clebertsuconic-2
This closes #1639


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/07eb1d25
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/07eb1d25
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/07eb1d25

Branch: refs/heads/master
Commit: 07eb1d25bea01217c6764ede15310eb449053035
Parents: b0c8307 559a704
Author: Clebert Suconic <[hidden email]>
Authored: Fri Nov 3 23:51:07 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Nov 3 23:51:07 2017 -0400

----------------------------------------------------------------------
 .../jms/client/ActiveMQMessageProducer.java     |  11 +-
 .../amqp/broker/AMQPSessionCallback.java        |  32 ++--
 .../proton/ProtonServerReceiverContext.java     |  25 ++-
 .../core/ServerSessionPacketHandler.java        |   2 +-
 .../artemis/core/server/BindingQueryResult.java |  10 ++
 .../core/server/impl/ActiveMQServerImpl.java    |   6 +-
 .../amqp/BrokerDefinedAnycastConsumerTest.java  |   7 +
 .../integration/amqp/QueueAutoCreationTest.java | 161 +++++++++++++++++++
 .../LargeMessageQueueAutoCreationTest.java      | 158 ++++++++++++++++++
 .../jtests/jms/framework/PubSubTestCase.java    |   3 +-
 10 files changed, 394 insertions(+), 21 deletions(-)
----------------------------------------------------------------------