activemq git commit: AMQ-7067 - track xa commit outcomes in ack compaction such that there are no dangling prepared tx on full recovery, fix and test

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

activemq git commit: AMQ-7067 - track xa commit outcomes in ack compaction such that there are no dangling prepared tx on full recovery, fix and test

jgenender
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x cee7f014f -> f7ff4c25e


AMQ-7067 - track xa commit outcomes in ack compaction such that there are no dangling prepared tx on full recovery, fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f7ff4c25
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f7ff4c25
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f7ff4c25

Branch: refs/heads/activemq-5.15.x
Commit: f7ff4c25e13133132c3103141a5179c93e43b536
Parents: cee7f01
Author: gtully <[hidden email]>
Authored: Wed Oct 17 18:13:36 2018 +0100
Committer: Jeff Genender <[hidden email]>
Committed: Wed Oct 17 11:34:46 2018 -0600

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  17 +++-
 .../org/apache/activemq/bugs/AMQ7067Test.java   | 100 +++++++++++++++++++
 2 files changed, 116 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f7ff4c25/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 188b021..d3179e1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2105,7 +2105,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     LOG.trace("Error loading command during ack forward: {}", nextLocation);
                 }
 
-                if (command != null && command instanceof KahaRemoveMessageCommand) {
+                if (shouldForward(command)) {
                     payload = toByteSequence(command);
                     Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
                     updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
@@ -2141,6 +2141,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
     }
 
+    private boolean shouldForward(JournalCommand<?> command) {
+        boolean result = false;
+        if (command != null) {
+            if (command instanceof KahaRemoveMessageCommand) {
+                result = true;
+            } else if (command instanceof KahaCommitCommand) {
+                KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
+                if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
+                    result = true;
+                }
+            }
+        }
+        return result;
+    }
+
     private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
         //getNextLocation() can throw an IOException, we should handle it and set
         //nextLocation to null and abort gracefully

http://git-wip-us.apache.org/repos/asf/activemq/blob/f7ff4c25/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
index d00ee41..18b4b41 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
@@ -134,6 +134,106 @@ public class AMQ7067Test {
     }
 
     @Test
+    public void testXACommitWithAckCompactionDoesNotLooseOutcomeOnFullRecovery() throws Exception {
+        doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(true);
+    }
+
+    @Test
+    public void testXARollbackWithAckCompactionDoesNotLooseOutcomeOnFullRecovery() throws Exception {
+        doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(false);
+    }
+
+    protected void doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(boolean commit) throws Exception {
+
+        ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setCompactAcksAfterNoGC(2);
+        // investigate liner gc issue - store usage not getting released
+        org.apache.log4j.Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
+
+
+        setupXAConnection();
+
+        Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
+
+        MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb);
+
+        XATransactionId txid = createXATransaction();
+        System.out.println("****** create new txid = " + txid);
+        xaRes.start(txid, TMNOFLAGS);
+
+        TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        xaRes.end(txid, TMSUCCESS);
+
+        Queue queue = xaSession.createQueue("test");
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+        ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+        xaRes.prepare(txid);
+
+        // hold onto data file with prepare record
+        produce(xaRes, xaSession, holdKahaDb, 1, 10);
+
+        produce(xaRes, xaSession, queue, 50, 512 * 1024);
+        ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        if (commit) {
+            xaRes.commit(txid, false);
+        } else {
+            xaRes.rollback(txid);
+        }
+
+        produce(xaRes, xaSession, queue, 50, 512 * 1024);
+        ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        int limit = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1;
+        // force gc, n data files requires n cycles
+        for (int dataFilesToMove = 0; dataFilesToMove < 4; dataFilesToMove++) {
+            for (int i = 0; i < limit; i++) {
+                broker.getPersistenceAdapter().checkpoint(true);
+            }
+            // ack compaction task operates in the background
+            TimeUnit.SECONDS.sleep(2);
+        }
+
+
+        Xid[] xids = xaRes.recover(TMSTARTRSCAN);
+
+        //Should be 0 since we have delivered the outcome
+        assertEquals(0, xids.length);
+        connection.close();
+
+        // need full recovery to see lost commit record
+        curruptIndexFile(getDataDirectory());
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+
+        setupXAConnection();
+        xids = xaRes.recover(TMSTARTRSCAN);
+
+        System.out.println("****** recovered = " + xids);
+
+        assertEquals(0, xids.length);
+    }
+
+    @Test
     public void testXAcommit() throws Exception {
 
         setupXAConnection();