activemq git commit: [AMQ-6771] do linear sequential scan of journal when validating checksums - remove batch reads via seek/read which depend on write batch size

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

activemq git commit: [AMQ-6771] do linear sequential scan of journal when validating checksums - remove batch reads via seek/read which depend on write batch size

gtully-2
Repository: activemq
Updated Branches:
  refs/heads/master 56bed30c6 -> 8c218ee05


[AMQ-6771] do linear sequential scan of journal when validating checksums - remove batch reads via seek/read which depend on write batch size


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8c218ee0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8c218ee0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8c218ee0

Branch: refs/heads/master
Commit: 8c218ee05d2529e62bb1792b20d22e7086169e6a
Parents: 56bed30
Author: gtully <[hidden email]>
Authored: Mon Jul 17 12:18:25 2017 +0100
Committer: gtully <[hidden email]>
Committed: Mon Jul 17 12:18:25 2017 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/util/ByteSequence.java  |  22 +++-
 .../activemq/util/DataByteArrayInputStream.java |   4 +-
 .../activemq/store/kahadb/MessageDatabase.java  |   3 +-
 .../kahadb/disk/journal/DataFileAccessor.java   |  37 +-----
 .../store/kahadb/disk/journal/Journal.java      | 125 +++++++++++--------
 .../util/RecoverableRandomAccessFile.java       |   2 +-
 .../JournalCorruptionEofIndexRecoveryTest.java  |  23 +++-
 .../store/kahadb/JournalFdRecoveryTest.java     |  37 +++++-
 8 files changed, 158 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
index 2699856..ac1e01a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ByteSequence.java
@@ -50,6 +50,8 @@ public class ByteSequence {
         return offset;
     }
 
+    public int remaining() { return length - offset; }
+
     public void setData(byte[] data) {
         this.data = data;
     }
@@ -71,8 +73,14 @@ public class ByteSequence {
         }
     }
 
+    public void reset() {
+        length = remaining();
+        System.arraycopy(data, offset, data, 0, length);
+        offset = 0;
+    }
+
     public int indexOf(ByteSequence needle, int pos) {
-        int max = length - needle.length;
+        int max = length - needle.length - offset;
         for (int i = pos; i < max; i++) {
             if (matches(needle, i)) {
                 return i;
@@ -102,4 +110,16 @@ public class ByteSequence {
         }
         return -1;
     }
+
+    public boolean startsWith(final byte[] bytes) {
+        if (length - offset < bytes.length) {
+            return false;
+        }
+        for (int i = 0; i<bytes.length; i++) {
+            if (data[offset+i] != bytes[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
index 2fe92a1..3b42c9f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/DataByteArrayInputStream.java
@@ -65,6 +65,8 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
         return pos - offset;
     }
 
+    public int position() { return pos; }
+
     /**
      * @return the underlying data array
      */
@@ -224,7 +226,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
     }
 
     public long readLong() throws IOException {
-        if (pos >= buf.length ) {
+        if (pos + 8 >= buf.length ) {
             throw new EOFException();
         }
         long rc = ((long)buf[pos++] << 56) + ((long)(buf[pos++] & 255) << 48) + ((long)(buf[pos++] & 255) << 40) + ((long)(buf[pos++] & 255) << 32);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 3321b35..40e8f95 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -480,6 +480,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         try {
             IOHelper.mkdirs(directory);
             if (deleteAllMessages) {
+                getJournal().setCheckForCorruptionOnStartup(false);
                 getJournal().start();
                 getJournal().delete();
                 getJournal().close();
@@ -1048,7 +1049,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
     private Location getNextInitializedLocation(Location location) throws IOException {
         Location mayNotBeInitialized = journal.getNextLocation(location);
-        if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
+        if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
             // need to init size and type to skip
             return journal.getNextLocation(mayNotBeInitialized);
         } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 71c2195..548d3b1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.store.kahadb.disk.journal;
 
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Map;
 
 import org.apache.activemq.util.ByteSequence;
@@ -115,38 +116,6 @@ final class DataFileAccessor {
         }
     }
 
-//    public boolean readLocationDetailsAndValidate(Location location) {
-//        try {
-//            WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
-//            if (asyncWrite != null) {
-//                location.setSize(asyncWrite.location.getSize());
-//                location.setType(asyncWrite.location.getType());
-//            } else {
-//                file.seek(location.getOffset());
-//                location.setSize(file.readInt());
-//                location.setType(file.readByte());
-//
-//                byte data[] = new byte[3];
-//                file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
-//                file.readFully(data);
-//                if (data[0] != Journal.ITEM_HEAD_SOR[0]
-//                    || data[1] != Journal.ITEM_HEAD_SOR[1]
-//                    || data[2] != Journal.ITEM_HEAD_SOR[2]) {
-//                    return false;
-//                }
-//                file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
-//                file.readFully(data);
-//                if (data[0] != Journal.ITEM_HEAD_EOR[0]
-//                    || data[1] != Journal.ITEM_HEAD_EOR[1]
-//                    || data[2] != Journal.ITEM_HEAD_EOR[2]) {
-//                    return false;
-//                }
-//            }
-//        } catch (IOException e) {
-//            return false;
-//        }
-//        return true;
-//    }
 
     public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
 
@@ -157,4 +126,8 @@ final class DataFileAccessor {
             file.sync();
         }
     }
+
+    public RecoverableRandomAccessFile getRaf() {
+        return file;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index a78cc65..5edee92 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -90,12 +89,21 @@ public class Journal {
         // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
-            int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
-            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
+            RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
+            randomAccessFile.seek(recoveryPosition.getOffset() + 1);
+            byte[] data = new byte[getWriteBatchSize()];
+            ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
+            int nextOffset = 0;
+            if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
+                nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
+            } else {
+                nextOffset = Math.toIntExact(randomAccessFile.length());
+            }
+            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1);
             LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
 
             // skip corruption on getNextLocation
-            recoveryPosition.setOffset((int) sequence.getLast() + 1);
+            recoveryPosition.setOffset(nextOffset);
             recoveryPosition.setSize(-1);
 
             dataFile.corruptedBlocks.add(sequence);
@@ -463,21 +471,19 @@ public class Journal {
     }
 
     public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
-        int firstBatchRecordSize = -1;
         if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
-            Location location = new Location();
-            location.setDataFileId(dataFile.getDataFileId());
-            location.setOffset(0);
-
             DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
             try {
-                firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
+                byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length];
+                reader.readFully(0, firstFewBytes);
+                ByteSequence bs = new ByteSequence(firstFewBytes);
+                return bs.startsWith(EOF_RECORD);
             } catch (Exception ignored) {
             } finally {
                 accessorPool.closeDataFileAccessor(reader);
             }
         }
-        return firstBatchRecordSize == 0;
+        return false;
     }
 
     protected Location recoveryCheck(DataFile dataFile) throws IOException {
@@ -487,9 +493,15 @@ public class Journal {
 
         DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
+            RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
+            randomAccessFile.seek(0);
+            final long totalFileLength = randomAccessFile.length();
+            byte[] data = new byte[getWriteBatchSize()];
+            ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
+
             while (true) {
-                int size = checkBatchRecord(reader, location.getOffset());
-                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
+                int size = checkBatchRecord(bs, randomAccessFile);
+                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
                     if (size == 0) {
                         // eof batch record
                         break;
@@ -500,8 +512,8 @@ public class Journal {
                     // Perhaps it's just some corruption... scan through the
                     // file to find the next valid batch record. We
                     // may have subsequent valid batch records.
-                    int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
-                    if (nextOffset >= 0) {
+                    if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
+                        int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
                         Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
                         LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
                         dataFile.corruptedBlocks.add(sequence);
@@ -533,41 +545,33 @@ public class Journal {
         return location;
     }
 
-    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
-        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
-        byte data[] = new byte[1024*4];
-        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
-
+    private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
+        final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
         int pos = 0;
         while (true) {
-            pos = bs.indexOf(header, pos);
+            pos = bs.indexOf(header, 0);
             if (pos >= 0) {
-                return offset + pos;
+                bs.setOffset(bs.offset + pos);
+                return pos;
             } else {
                 // need to load the next data chunck in..
-                if (bs.length != data.length) {
+                if (bs.length != bs.data.length) {
                     // If we had a short read then we were at EOF
                     return -1;
                 }
-                offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
-                bs = new ByteSequence(data, 0, reader.read(offset, data));
-                pos = 0;
+                bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length);
+                bs.reset();
+                bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length));
             }
         }
     }
 
-    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
-        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+    private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
 
-        try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
-
-            reader.readFully(offset, controlRecord);
-
-            // check for journal eof
-            if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
-                // eof batch
-                return 0;
-            }
+        if (bs.startsWith(EOF_RECORD)) {
+            return 0; // eof
+        }
+        try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
 
             // Assert that it's a batch record.
             for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
@@ -578,28 +582,43 @@ public class Journal {
 
             int size = controlIs.readInt();
             if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
-                return -1;
+                return -2;
             }
 
-            if (isChecksum()) {
-
-                long expectedChecksum = controlIs.readLong();
-                if (expectedChecksum == 0) {
-                    // Checksuming was not enabled when the record was stored.
-                    // we can't validate the record :(
-                    return size;
-                }
-
-                byte data[] = new byte[size];
-                reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
+            long expectedChecksum = controlIs.readLong();
+            Checksum checksum = null;
+            if (isChecksum() && expectedChecksum > 0) {
+                checksum = new Adler32();
+            }
 
-                Checksum checksum = new Adler32();
-                checksum.update(data, 0, data.length);
+            // revert to bs to consume data
+            bs.setOffset(controlIs.position());
+            int toRead = size;
+            while (toRead > 0) {
+                if (bs.remaining() >= toRead) {
+                    if (checksum != null) {
+                        checksum.update(bs.getData(), bs.getOffset(), toRead);
+                    }
+                    bs.setOffset(bs.offset + toRead);
+                    toRead = 0;
+                } else {
+                    if (bs.length != bs.data.length) {
+                        // buffer exhausted
+                        return  -3;
+                    }
 
-                if (expectedChecksum != checksum.getValue()) {
-                    return -1;
+                    toRead -= bs.remaining();
+                    if (checksum != null) {
+                        checksum.update(bs.getData(), bs.getOffset(), bs.remaining());
+                    }
+                    bs.setLength(reader.read(bs.data));
+                    bs.setOffset(0);
                 }
             }
+            if (checksum != null && expectedChecksum != checksum.getValue()) {
+                return -4;
+            }
+
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
index 636890e..171f0c9 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
@@ -44,7 +44,7 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
         raf = new RandomAccessFile(file, mode);
     }
 
-    protected RandomAccessFile getRaf() throws IOException {
+    public RandomAccessFile getRaf() throws IOException {
         if (raf == null) {
             raf = new RandomAccessFile(file, mode);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index 221b087..16598ea 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -64,10 +64,12 @@ public class JournalCorruptionEofIndexRecoveryTest {
     private String connectionUri;
     private KahaDBPersistenceAdapter adapter;
     private boolean ignoreMissingJournalFiles = false;
+    private int journalMaxBatchSize;
 
     private final Destination destination = new ActiveMQQueue("Test");
     private final String KAHADB_DIRECTORY = "target/activemq-data/";
     private final String payload = new String(new byte[1024]);
+    File brokerDataDir = null;
 
     protected void startBroker() throws Exception {
         doStartBroker(true, false);
@@ -78,14 +80,13 @@ public class JournalCorruptionEofIndexRecoveryTest {
     }
 
     protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception {
-        File dataDir = broker.getPersistenceAdapter().getDirectory();
         if (broker != null) {
             broker.stop();
             broker.waitUntilStopped();
         }
 
         if (whackIndex) {
-            File indexToDelete = new File(dataDir, "db.data");
+            File indexToDelete = new File(brokerDataDir, "db.data");
             LOG.info("Whacking index: " + indexToDelete);
             indexToDelete.delete();
         }
@@ -113,6 +114,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
         cf = new ActiveMQConnectionFactory(connectionUri);
 
         broker.start();
+        brokerDataDir = broker.getPersistenceAdapter().getDirectory();
         LOG.info("Starting broker..");
     }
 
@@ -124,6 +126,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
         // ensure there are a bunch of data files but multiple entries in each
         adapter.setJournalMaxFileLength(1024 * 20);
 
+        adapter.setJournalMaxWriteBatchSize(journalMaxBatchSize);
+
         // speed up the test case, checkpoint an cleanup early and often
         adapter.setCheckpointInterval(5000);
         adapter.setCleanupInterval(5000);
@@ -146,6 +150,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
     @Before
     public void reset() throws Exception {
         ignoreMissingJournalFiles = true;
+        journalMaxBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     }
 
     @Test
@@ -234,6 +239,20 @@ public class JournalCorruptionEofIndexRecoveryTest {
         assertEquals("Drain", numToSend, drainQueue(numToSend));
     }
 
+    @Test
+    public void testRecoverIndexWithSmallBatch() throws Exception {
+        journalMaxBatchSize = 2 * 1024;
+        startBroker();
+
+        final int numToSend = 4;
+        produceMessagesToConsumeMultipleDataFiles(numToSend);
+
+        // force journal replay by whacking the index
+        restartBroker(false, true);
+
+        assertEquals("Drain", numToSend, drainQueue(numToSend));
+    }
+
 
     @Test
     public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c218ee0/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
index 633ab5c..ffe8ab6 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.store.kahadb.disk.journal.DataFile;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ import javax.management.Attribute;
 import javax.management.ObjectName;
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -54,8 +57,7 @@ public class JournalFdRecoveryTest {
     private static final Logger LOG = LoggerFactory.getLogger(JournalFdRecoveryTest.class);
 
     private final String KAHADB_DIRECTORY = "target/activemq-data/";
-    private final String payload = new String(new byte[1024]);
-
+    private String payload;
     private ActiveMQConnectionFactory cf = null;
     private BrokerService broker = null;
     private final Destination destination = new ActiveMQQueue("Test");
@@ -63,6 +65,7 @@ public class JournalFdRecoveryTest {
     private KahaDBPersistenceAdapter adapter;
 
     public byte fill = Byte.valueOf("3");
+    private int maxJournalSizeBytes;
 
     protected void startBroker() throws Exception {
         doStartBroker(true);
@@ -88,7 +91,6 @@ public class JournalFdRecoveryTest {
     }
 
     private void doCreateBroker(boolean delete) throws Exception {
-
         broker = new BrokerService();
         broker.setDeleteAllMessagesOnStartup(delete);
         broker.setPersistent(true);
@@ -112,7 +114,7 @@ public class JournalFdRecoveryTest {
         adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
 
         // ensure there are a bunch of data files but multiple entries in each
-        adapter.setJournalMaxFileLength(1024 * 20);
+        adapter.setJournalMaxFileLength(maxJournalSizeBytes);
 
         // speed up the test case, checkpoint an cleanup early and often
         adapter.setCheckpointInterval(5000);
@@ -132,6 +134,12 @@ public class JournalFdRecoveryTest {
         }
     }
 
+    @Before
+    public void initPayLoad() {
+        payload = new String(new byte[1024]);
+        maxJournalSizeBytes = 1024 * 20;
+    }
+
 
     @Test
     public void testStopOnPageInIOError() throws Exception {
@@ -236,6 +244,27 @@ public class JournalFdRecoveryTest {
 
     }
 
+    @Test
+    public void testRecoveryCheckSpeedSmallMessages() throws Exception {
+        maxJournalSizeBytes = Journal.DEFAULT_MAX_FILE_LENGTH;
+        doCreateBroker(true);
+        broker.start();
+
+        int toSend = 20000;
+        payload = new String(new byte[100]);
+        produceMessagesToConsumeMultipleDataFiles(toSend);
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        Instant b = Instant.now();
+        doStartBroker(false);
+        Instant e = Instant.now();
+
+        Duration timeElapsed = Duration.between(b, e);
+        LOG.info("Elapsed: " + timeElapsed);
+    }
+
     private long totalOpenFileDescriptorCount(BrokerService broker) {
         long result = 0;
         try {

Loading...