[activemq-artemis] branch master updated: ARTEMIS-2622 Making replica resilient to closed pages

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated: ARTEMIS-2622 Making replica resilient to closed pages

jbertram
This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 990890d  ARTEMIS-2622 Making replica resilient to closed pages
     new b588f61  This closes #2983
990890d is described below

commit 990890d228f9fa2aec4ee113b6469f5fd309dfca
Author: Clebert Suconic <[hidden email]>
AuthorDate: Fri Feb 14 13:09:33 2020 -0500

    ARTEMIS-2622 Making replica resilient to closed pages
---
 .../core/replication/ReplicationEndpoint.java      |  7 +-
 .../artemis/core/server/ActiveMQMessageBundle.java |  2 +-
 .../failover/ReplicatedPagedFailoverTest.java      | 98 +++++++++++++++++++++-
 3 files changed, 104 insertions(+), 3 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 33a39e3..2a49ec3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -728,7 +728,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       Page page = pages.remove(packet.getPageNumber());
 
       if (page == null) {
-         page = getPage(packet.getStoreName(), packet.getPageNumber());
+         // if page is null, we create it the instance and include it on the map
+         // then we must recurse this call
+         // so page.delete or page.close will not leave any closed objects on the hashmap
+         getPage(packet.getStoreName(), packet.getPageNumber());
+         handlePageEvent(packet);
+         return;
       }
 
       if (page != null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 69249c2..2e1c7c2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -447,7 +447,7 @@ public interface ActiveMQMessageBundle {
    @Message(id = 229216, value = "Invalid queue name: {0}", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQIllegalStateException invalidQueueName(SimpleString queueName);
 
-   @Message(id = 119217, value = "Can't write to closed file: {0}", format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 119217, value = "Cannot write to closed file: {0}", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQIOErrorException cannotWriteToClosedFile(SequentialFile file);
 
    @Message(id = 229218, value = "Failed to locate broker configuration URL")
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
index d0d9f0f..0797590 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
@@ -18,10 +18,19 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
 
 import java.util.HashMap;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest {
@@ -37,6 +46,93 @@ public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest {
    @Override
    @Test
    public void testFailWithBrowser() throws Exception {
-      // paged messages are not available for browsing
+      internalBrowser(0);
+   }
+
+   @Test
+   public void testFailWithBrowserWithClose() throws Exception {
+      internalBrowser(1);
+   }
+
+   @Test
+   public void testFailWithBrowserWithDelete() throws Exception {
+      internalBrowser(2);
+   }
+
+   //
+   // 0 - no tamper
+   // 1 - close files
+   // 2 - remove files
+   private void internalBrowser(int temperMode) throws Exception {
+      int numMessages = 50;
+      int messagesPerPage = 10;
+      int iterations = 10;
+      createSessionFactory();
+      ClientSession session = createSession(sf, true, true);
+
+      session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, false);
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      Queue queue = liveServer.getServer().locateQueue(FailoverTest.ADDRESS);
+
+      for (int j = 0; j < iterations; j++) {
+         System.err.println("#iteration " + j);
+         queue.getPageSubscription().getPagingStore().startPaging();
+         Assert.assertNotNull(queue);
+
+         for (int i = 0; i < numMessages; i++) {
+            // some are durable, some are not!
+            producer.send(createMessage(session, i, i % 2 == 0));
+            if (i > 0 && i % messagesPerPage == 0) {
+               queue.getPageSubscription().getPagingStore().forceAnotherPage();
+            }
+         }
+
+         ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
+
+         session.start();
+
+         while (true) {
+            ClientMessage msg = consumer.receive(500);
+            if (msg == null) {
+               break;
+            }
+         }
+         consumer.close();
+
+         PagingStore store = queue.getPageSubscription().getPagingStore();
+
+         if (temperMode == 1) {
+            // this is tampering with the system causing an artifical issue. The system should still heal itself.
+            for (long pageID = store.getFirstPage(); pageID <= store.getCurrentPage().getPageId() + 10; pageID++) {
+               System.out.println("Sending close on " + pageID);
+               liveServer.getServer().getStorageManager().pageClosed(store.getStoreName(), (int) pageID);
+            }
+         }  else if (temperMode == 2) {
+            // this is tampering with the system causing an artifical issue. The system should still heal itself.
+            for (long pageID = store.getFirstPage(); pageID <= store.getCurrentPage().getPageId() + 10; pageID++) {
+               System.out.println("Sending close on " + pageID);
+               liveServer.getServer().getStorageManager().pageDeleted(store.getStoreName(), (int) pageID);
+            }
+         }
+         store.getFirstPage();
+         store.getCurrentPage().getPageId();
+
+         consumer = session.createConsumer(FailoverTestBase.ADDRESS, false);
+         session.start();
+
+         while (true) {
+            ClientMessage msg = consumer.receive(500);
+            if (msg == null) {
+               break;
+            }
+            msg.acknowledge();
+         }
+         consumer.close();
+
+         Wait.assertFalse(queue.getPageSubscription().getPagingStore()::isPaging);
+      }
+
    }
 }