[activemq] branch activemq-5.15.x updated (20493fd -> 602e382)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[activemq] branch activemq-5.15.x updated (20493fd -> 602e382)

cshannon
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a change to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git.


    from 20493fd  AMQ-7097 Update Qpid JMS and Netty to latest
     new 24b5944  AMQ-7129 - Properly recover messages from KahaDB for a durable when there are messages to recover before the stored lastAck value
     new 602e382  AMQ-7129 - minor junit test fix

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  37 ++-
 .../activemq/store/kahadb/MessageDatabase.java     |   9 +
 .../kahadb/KahaDBDurableMessageRecoveryTest.java   | 350 +++++++++++++++++++++
 3 files changed, 394 insertions(+), 2 deletions(-)
 create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java

Reply | Threaded
Open this post in threaded view
|

[activemq] 01/02: AMQ-7129 - Properly recover messages from KahaDB for a durable when there are messages to recover before the stored lastAck value

cshannon
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 24b5944ecbf315866ce227862fb2d264f5b1654f
Author: Christopher L. Shannon (cshannon) <[hidden email]>
AuthorDate: Wed Jan 9 12:51:03 2019 -0500

    AMQ-7129 - Properly recover messages from KahaDB for a durable when there are
    messages to recover before the stored lastAck value
   
    With individual ack mode we need to check the durable ackPosition
    sequence set in the KahaDB index on subsription load to see if there are
    earlier messages before the lastAck value that still haven't been acked.
    While this normally wouldn't happen it is possible in individual ack
    mode
   
    (cherry picked from commit 25de20c77ec0bf6cdc699cac2ad50e34ec707453)
---
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  37 ++-
 .../activemq/store/kahadb/MessageDatabase.java     |   9 +
 .../kahadb/KahaDBDurableMessageRecoveryTest.java   | 350 +++++++++++++++++++++
 3 files changed, 394 insertions(+), 2 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index c552d79..cbbf9b6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -79,6 +79,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -1001,7 +1002,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
-                        sd.orderIndex.setBatch(tx, cursorPos);
+                        SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
+                        //If we have ackPositions tracked then compare the first one as individual acknowledge mode
+                        //may have bumped lastAck even though there are earlier messages to still consume
+                        if (subAckPositions != null && !subAckPositions.isEmpty()
+                                && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) {
+                            //we have messages to ack before lastAckedSequence
+                            sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
+                        } else {
+                            subAckPositions = null;
+                            sd.orderIndex.setBatch(tx, cursorPos);
+                        }
                         recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
@@ -1009,6 +1020,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
+                            //If subAckPositions is set then verify the sequence set contains the message still
+                            //and if it doesn't skip it
+                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
+                                continue;
+                            }
                             listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
                         sd.orderIndex.resetCursorPosition();
@@ -1033,13 +1049,24 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                         StoredDestination sd = getStoredDestination(dest, tx);
                         sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
+                        SequenceSet subAckPositions = null;
                         if (moc == null) {
                             LastAck pos = getLastAck(tx, sd, subscriptionKey);
                             if (pos == null) {
                                 // sub deleted
                                 return;
                             }
-                            sd.orderIndex.setBatch(tx, pos);
+                            subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
+                            //If we have ackPositions tracked then compare the first one as individual acknowledge mode
+                            //may have bumped lastAck even though there are earlier messages to still consume
+                            if (subAckPositions != null && !subAckPositions.isEmpty()
+                                    && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) {
+                                //we have messages to ack before lastAckedSequence
+                                sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
+                            } else {
+                                subAckPositions = null;
+                                sd.orderIndex.setBatch(tx, pos);
+                            }
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);
@@ -1053,6 +1080,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
+                            //If subAckPositions is set then verify the sequence set contains the message still
+                            //and if it doesn't skip it
+                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
+                                continue;
+                            }
                             if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
                                 counter++;
                             }
@@ -1451,6 +1483,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                 super(runnable, null);
             }
 
+            @Override
             public void setException(final Throwable e) {
                 super.setException(e);
             }
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 5fb1e90..8bb902d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2981,6 +2981,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return sd.subscriptionAcks.get(tx, subscriptionKey);
     }
 
+    protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+        if (sd.ackPositions != null) {
+            final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+            return messageSequences;
+        }
+
+        return null;
+    }
+
     protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
         if (sd.ackPositions != null) {
             SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
new file mode 100644
index 0000000..66890a9
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KahaDBDurableMessageRecoveryTest {
+
+    @Parameters(name = "recoverIndex")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { false }, { true } });
+    }
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+    private BrokerService broker;
+    private URI brokerConnectURI;
+
+    private boolean recoverIndex;
+
+    @Before
+    public void setUpBroker() throws Exception {
+        startBroker(false);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    /**
+     * @param deleteIndex
+     */
+    public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) {
+        super();
+        this.recoverIndex = recoverIndex;
+    }
+
+    protected void startBroker(boolean recoverIndex) throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
+
+        TransportConnector connector = broker.addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+        configurePersistence(broker, recoverIndex);
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean forceRecoverIndex) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        adapter.setForceRecoverIndex(forceRecoverIndex);
+
+        // set smaller size for test
+        adapter.setJournalMaxFileLength(1024 * 20);
+    }
+
+    protected void restartBroker(boolean deleteIndex) throws Exception {
+        stopBroker();
+        startBroker(deleteIndex);
+    }
+
+    protected Session getSession(int ackMode) throws Exception {
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId1");
+        connection.start();
+        Session session = connection.createSession(false, ackMode);
+
+        return session;
+    }
+
+    /**
+     * Test that on broker restart a durable topic subscription will recover all
+     * messages before the "last ack" in KahaDB which could happen if using
+     * individual acknowledge mode and skipping messages
+     */
+    @Test
+    public void durableRecoveryIndividualAcknowledge() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+
+        // Receive only the 5th message using individual ack mode
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            if (i == 5) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 9 messages left still and restart broker
+        assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        subscriber.close();
+        restartBroker(recoverIndex);
+
+        // Verify 9 messages exist in store on startup
+        assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+
+        // Recreate subscriber and try and receive the other 9 messages
+        session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
+        subscriber = session.createDurableSubscriber(topic, "sub1");
+
+        for (int i = 1; i <= 4; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 6; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        subscriber.close();
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+    }
+
+    @Test
+    public void multipleDurableRecoveryIndividualAcknowledge() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "sub1");
+        TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "sub2");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+
+        // Receive 2 messages using individual ack mode only on first sub
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            if (i == 3 || i == 7) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 8 messages left still and restart broker
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+        subscriber1.close();
+        subscriber2.close();
+        restartBroker(recoverIndex);
+
+        // Verify 8 messages exist in store on startup on sub 1 and 10 on sub 2
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+
+        // Recreate subscriber and try and receive the other 8 messages
+        session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
+        subscriber1 = session.createDurableSubscriber(topic, "sub1");
+        subscriber2 = session.createDurableSubscriber(topic, "sub2");
+
+        for (int i = 1; i <= 2; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 4; i <= 6; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 8; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        // Make sure sub 2 gets all 10
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber2.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        subscriber1.close();
+        subscriber2.close();
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+    }
+
+    @Test
+    public void multipleDurableTestRecoverSubscription() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "sub1");
+        TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "sub2");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+
+        // Receive 2 messages using individual ack mode only on first sub
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            if (i == 3 || i == 7) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 8 messages left on sub 1 and 10 on sub2 and restart
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+        subscriber1.close();
+        subscriber2.close();
+        restartBroker(recoverIndex);
+
+        //Manually recover subscription and verify proper messages are loaded
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
+        final AtomicInteger sub1Recovered = new AtomicInteger();
+        final AtomicInteger sub2Recovered = new AtomicInteger();
+        store.recoverSubscription("clientId1", "sub1", new MessageRecoveryListener() {
+            @Override
+            public boolean recoverMessageReference(MessageId ref) throws Exception {
+                return false;
+            }
+
+            @Override
+            public boolean recoverMessage(Message message) throws Exception {
+                TextMessage textMessage = (TextMessage) message;
+                if (textMessage.getText().equals("msg: " + 3) || textMessage.getText().equals("msg: " + 7)) {
+                    throw new IllegalStateException("Got wrong message: " + textMessage.getText());
+                }
+                sub1Recovered.incrementAndGet();
+                return true;
+            }
+
+            @Override
+            public boolean isDuplicate(MessageId ref) {
+                return false;
+            }
+
+            @Override
+            public boolean hasSpace() {
+                return true;
+            }
+        });
+
+        store.recoverSubscription("clientId1", "sub2", new MessageRecoveryListener() {
+            @Override
+            public boolean recoverMessageReference(MessageId ref) throws Exception {
+                return false;
+            }
+
+            @Override
+            public boolean recoverMessage(Message message) throws Exception {
+                sub2Recovered.incrementAndGet();
+                return true;
+            }
+
+            @Override
+            public boolean isDuplicate(MessageId ref) {
+                return false;
+            }
+
+            @Override
+            public boolean hasSpace() {
+                return true;
+            }
+        });
+
+        //Verify proper number of messages are recovered
+        assertEquals(8, sub1Recovered.get());
+        assertEquals(10, sub2Recovered.get());
+    }
+
+    protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception {
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
+        return store.getMessageCount(clientId, subId);
+    }
+}

Reply | Threaded
Open this post in threaded view
|

[activemq] 02/02: AMQ-7129 - minor junit test fix

cshannon
In reply to this post by cshannon
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 602e382f1deb52002986ba7e308111088cc249a8
Author: Christopher L. Shannon (cshannon) <[hidden email]>
AuthorDate: Wed Jan 9 14:39:11 2019 -0500

    AMQ-7129 - minor junit test fix
   
    (cherry picked from commit 703b8cbda39f4a1263e7ecfbb1eb1ec247f91162)
---
 .../apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
index 66890a9..a44e8c0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -55,7 +55,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class KahaDBDurableMessageRecoveryTest {
 
-    @Parameters(name = "recoverIndex")
+    @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] { { false }, { true } });
     }