[activemq-artemis] branch master updated: ARTEMIS-2592 Fixing DeadLock between deleteMessages and depage

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated: ARTEMIS-2592 Fixing DeadLock between deleteMessages and depage

jbertram
This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new e397a17  ARTEMIS-2592 Fixing DeadLock between deleteMessages and depage
     new d6331b0  This closes #2934
e397a17 is described below

commit e397a177967bcf431b9fa82463a61b152ba82fb0
Author: Clebert Suconic <[hidden email]>
AuthorDate: Thu Jan 9 12:15:04 2020 -0500

    ARTEMIS-2592 Fixing DeadLock between deleteMessages and depage
   
    This was happening through purge
---
 .../org/apache/activemq/artemis/utils/Wait.java    |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        |   7 ++
 .../artemis/core/server/impl/QueueManagerImpl.java |   4 +
 .../integration/addressing/AddressingTest.java     |   2 +-
 .../integration/client/AutoDeleteAddressTest.java  |   3 +-
 .../integration/client/AutoDeleteQueueTest.java    |   3 +-
 .../paging/TestDeadlockOnPurgePagingTest.java      | 136 +++++++++++++++++++++
 .../artemis/tests/integration/stomp/StompTest.java |   4 +-
 8 files changed, 156 insertions(+), 5 deletions(-)

diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
index 870fbf2..6cb011b 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
@@ -27,7 +27,7 @@ public class Wait {
 
 
    public static final long MAX_WAIT_MILLIS = 30 * 1000;
-   public static final int SLEEP_MILLIS = 1000;
+   public static final int SLEEP_MILLIS = 100;
    public static final String DEFAULT_FAILURE_MESSAGE = "Condition wasn't met";
 
    public interface Condition {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 212a68e..d03494a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1949,6 +1949,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                                       QueueIterateAction messageAction) throws Exception {
       int count = 0;
       int txCount = 0;
+      // This is to avoid scheduling depaging while iterQueue is happening
+      // this should minimize the use of the paged executor.
+      depagePending = true;
 
       depageLock.lock();
 
@@ -2037,6 +2040,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          return count;
       } finally {
          depageLock.unlock();
+         // to resume flow of depages, just in case
+         // as we disabled depaging during the execution of this method
+         depagePending = false;
+         forceDelivery();
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index eab8771..b20b537 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -93,6 +93,10 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
    }
 
    public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
+      // This needs to be an executor
+      // otherwise we may have dead-locks in certain cases such as failure,
+      // where consumers are closed after callbacks
+      super(server.getExecutorFactory().getExecutor());
       this.server = server;
       this.queueName = queueName;
       this.setTask(this::doIt);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 37e365b..5a905b8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -246,7 +246,7 @@ public class AddressingTest extends ActiveMQTestBase {
       consumer.close();
       // the last consumer was closed so the queue should exist but be purged
       assertNotNull(server.locateQueue(queueName));
-      assertEquals(0, queue.getMessageCount());
+      Wait.assertEquals(0, queue::getMessageCount);
 
       // there are no consumers so no messages should be routed to the queue
       producer.send(session.createMessage(true));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java
index 5242de2..ee78bda 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,7 +53,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
       server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
       assertNotNull(server.getAddressInfo(addressA));
       cf.createSession().createConsumer(queueA).close();
-      assertNull(server.getAddressInfo(addressA));
+      Wait.assertTrue(() -> server.getAddressInfo(addressA) == null);
    }
 
    @Test
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
index 0f9c09e..8603e7b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,7 +53,7 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
       server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
       assertNotNull(server.locateQueue(queueA));
       cf.createSession().createConsumer(queueA).close();
-      assertNull(server.locateQueue(queueA));
+      Wait.assertTrue(() -> server.locateQueue(queueA) == null);
    }
 
    @Test
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/TestDeadlockOnPurgePagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/TestDeadlockOnPurgePagingTest.java
new file mode 100644
index 0000000..425a2ad
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/TestDeadlockOnPurgePagingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDeadlockOnPurgePagingTest extends ActiveMQTestBase {
+
+   private static final Logger logger = Logger.getLogger(TestDeadlockOnPurgePagingTest.class);
+
+   protected ServerLocator locator;
+   protected ActiveMQServer server;
+   protected ClientSessionFactory sf;
+   static final int MESSAGE_SIZE = 1024; // 1k
+   static final int LARGE_MESSAGE_SIZE = 100 * 1024;
+
+   protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+   protected static final int RECEIVE_TIMEOUT = 5000;
+
+   protected static final int PAGE_MAX = 100 * 1024;
+
+   protected static final int PAGE_SIZE = 10 * 1024;
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      locator = createInVMNonHALocator();
+   }
+
+   @Test
+   public void testDeadlockOnPurge() throws Exception {
+
+      int NUMBER_OF_MESSAGES = 5000;
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, TestDeadlockOnPurgePagingTest.PAGE_SIZE, TestDeadlockOnPurgePagingTest.PAGE_MAX);
+
+      server.start();
+
+      String queue = "purgeQueue";
+      SimpleString ssQueue = new SimpleString(queue);
+      server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+      QueueImpl purgeQueue = (QueueImpl) server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
+
+      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
+      Connection connection = cf.createConnection();
+
+      connection.start();
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue jmsQueue = session.createQueue(queue);
+
+      MessageProducer producer = session.createProducer(jmsQueue);
+
+      for (int i = 0; i < 100; i++) {
+         producer.send(session.createTextMessage("hello" + i));
+      }
+      session.commit();
+
+      Wait.assertEquals(0, purgeQueue::getMessageCount);
+
+      Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize);
+
+      MessageConsumer consumer = session.createConsumer(jmsQueue);
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES / 5; i++) {
+         producer.send(session.createTextMessage("hello" + i));
+         if (i == 10) {
+            purgeQueue.getPageSubscription().getPagingStore().startPaging();
+         }
+
+         if (i > 10 && i % 10 == 0) {
+            purgeQueue.getPageSubscription().getPagingStore().forceAnotherPage();
+         }
+      }
+      session.commit();
+
+
+      for (int i = 0; i < 100; i++) {
+         Message message = consumer.receive(5000);
+         Assert.assertNotNull(message);
+      }
+      session.commit();
+
+      consumer.close();
+
+      Wait.assertEquals(0L, purgeQueue::getMessageCount, 5000L, 10L);
+
+      Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging, 5000L, 10L);
+
+      Wait.assertEquals(0L, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize, 5000L, 10L);
+   }
+
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 3bc332b..73d74c0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -306,7 +306,7 @@ public class StompTest extends StompTestBase {
       // closing the consumer here should trigger auto-deletion
       assertNotNull(server.getPostOffice().getBinding(new SimpleString(queue)));
       consumer.close();
-      assertNull(server.getPostOffice().getBinding(new SimpleString(queue)));
+      Wait.assertTrue(() -> server.getPostOffice().getBinding(new SimpleString(queue)) == null);
    }
 
    @Test
@@ -1662,6 +1662,8 @@ public class StompTest extends StompTestBase {
 
       unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
 
+      Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(ADDRESS)) == null);
+
       // now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription
       uuid = UUID.randomUUID().toString();
       frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)