[1/2] activemq-artemis git commit: ARTEMIS-1114 Missing records after compacting

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] activemq-artemis git commit: ARTEMIS-1114 Missing records after compacting

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master ec161fc15 -> 09958aa54


ARTEMIS-1114 Missing records after compacting

This is fixing an issue introduced on 4b47461f03a607b9ef517beb2a1666ffae43a2a7 (ARTEMIS-822)
The Transactions were being looked up without the readLock and some of the controls for Read and Write lock
were broken after this.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ddacda50
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ddacda50
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ddacda50

Branch: refs/heads/master
Commit: ddacda50626ef2cd5ccf74a3149eccbbda4a9d84
Parents: ec161fc
Author: Clebert Suconic <[hidden email]>
Authored: Thu Apr 13 16:53:53 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Apr 14 01:13:46 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/utils/SimpleFuture.java    |  74 +++---
 .../artemis/utils/SimpleFutureImpl.java         |  81 ++++++
 .../artemis/utils/SimpleFutureTest.java         |   4 +-
 .../artemis/core/journal/impl/JournalBase.java  |   9 +-
 .../core/journal/impl/JournalCompactor.java     |  85 ++++++
 .../artemis/core/journal/impl/JournalImpl.java  | 265 ++++++++++++-------
 .../core/journal/impl/JournalTransaction.java   |  10 +
 .../journal/NIOJournalCompactTest.java          | 103 ++++++-
 .../journal/impl/AlignedJournalImplTest.java    |   4 +-
 .../core/journal/impl/JournalAsyncTest.java     |   1 +
 .../core/journal/impl/JournalImplTestBase.java  |  42 ++-
 11 files changed, 513 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
index eedfef4..b871f24 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
@@ -17,63 +17,55 @@
 
 package org.apache.activemq.artemis.utils;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public class SimpleFuture<V> implements Future<V> {
+public interface SimpleFuture<V> extends Future<V> {
 
-   public SimpleFuture() {
-   }
+   SimpleFuture dumb = new SimpleFuture() {
+      @Override
+      public void fail(Throwable e) {
 
-   V value;
-   Exception exception;
+      }
 
-   private final CountDownLatch latch = new CountDownLatch(1);
+      @Override
+      public void set(Object o) {
 
-   boolean canceled = false;
+      }
 
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-      canceled = true;
-      latch.countDown();
-      return true;
-   }
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+         return false;
+      }
 
-   @Override
-   public boolean isCancelled() {
-      return canceled;
-   }
+      @Override
+      public boolean isCancelled() {
+         return false;
+      }
 
-   @Override
-   public boolean isDone() {
-      return latch.getCount() <= 0;
-   }
+      @Override
+      public boolean isDone() {
+         return false;
+      }
 
-   public void fail(Exception e) {
-      this.exception = e;
-      latch.countDown();
-   }
+      @Override
+      public Object get() throws InterruptedException, ExecutionException {
+         return null;
+      }
 
-   @Override
-   public V get() throws InterruptedException, ExecutionException {
-      latch.await();
-      if (this.exception != null) {
-         throw new ExecutionException(this.exception);
+      @Override
+      public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+         return null;
       }
-      return value;
-   }
+   };
 
-   public void set(V v) {
-      this.value = v;
-      latch.countDown();
+   static SimpleFuture dumb() {
+      return dumb;
    }
 
-   @Override
-   public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-      latch.await(timeout, unit);
-      return value;
-   }
+   void fail(Throwable e);
+
+   void set(V v);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFutureImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFutureImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFutureImpl.java
new file mode 100644
index 0000000..fae91c2
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFutureImpl.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SimpleFutureImpl<V> implements SimpleFuture<V> {
+
+   public SimpleFutureImpl() {
+   }
+
+   V value;
+   Throwable exception;
+
+   private final CountDownLatch latch = new CountDownLatch(1);
+
+   boolean canceled = false;
+
+   @Override
+   public boolean cancel(boolean mayInterruptIfRunning) {
+      canceled = true;
+      latch.countDown();
+      return true;
+   }
+
+   @Override
+   public boolean isCancelled() {
+      return canceled;
+   }
+
+   @Override
+   public boolean isDone() {
+      return latch.getCount() <= 0;
+   }
+
+   @Override
+   public void fail(Throwable e) {
+      this.exception = e;
+      latch.countDown();
+   }
+
+   @Override
+   public V get() throws InterruptedException, ExecutionException {
+      latch.await();
+      if (this.exception != null) {
+         throw new ExecutionException(this.exception);
+      }
+      return value;
+   }
+
+   @Override
+   public void set(V v) {
+      this.value = v;
+      latch.countDown();
+   }
+
+   @Override
+   public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      latch.await(timeout, unit);
+      return value;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
index 00fd5d7..c3fa482 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
@@ -29,7 +29,7 @@ public class SimpleFutureTest {
    @Test
    public void testFuture() throws Exception {
       final long randomStart = System.currentTimeMillis();
-      final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
+      final SimpleFuture<Long> simpleFuture = new SimpleFutureImpl<>();
       Thread t = new Thread() {
          @Override
          public void run() {
@@ -44,7 +44,7 @@ public class SimpleFutureTest {
 
    @Test
    public void testException() throws Exception {
-      final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
+      final SimpleFuture<Long> simpleFuture = new SimpleFutureImpl<>();
       Thread t = new Thread() {
          @Override
          public void run() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index e6bd99e..2c03f92 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -162,13 +162,10 @@ abstract class JournalBase implements Journal {
    abstract void scheduleReclaim();
 
    protected SyncIOCompletion getSyncCallback(final boolean sync) {
-      if (supportsCallback) {
-         if (sync) {
-            return new SimpleWaitIOCallback();
-         }
-         return DummyCallback.getInstance();
+      if (sync) {
+         return new SimpleWaitIOCallback();
       }
-      return null;
+      return DummyCallback.getInstance();
    }
 
    private static final class NullEncoding implements EncodingSupport {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index 1ac5676..8b89c3e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.journal.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -139,10 +140,16 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
     * This methods informs the Compactor about the existence of a pending (non committed) transaction
     */
    public void addPendingTransaction(final long transactionID, final long[] ids) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("addPendingTransaction::tx=" + transactionID + ", ids=" + Arrays.toString(ids));
+      }
       pendingTransactions.put(transactionID, new PendingTransaction(ids));
    }
 
    public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("addCommandCommit " + liveTransaction.getId());
+      }
       pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile));
 
       long[] ids = liveTransaction.getPositiveArray();
@@ -170,6 +177,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    }
 
    public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("addCommandRollback " + liveTransaction + " currentFile " + currentFile);
+      }
       pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile));
    }
 
@@ -178,6 +188,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
     * @param usedFile
     */
    public void addCommandDelete(final long id, final JournalFile usedFile) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("addCommandDelete id " + id + " usedFile " + usedFile);
+      }
       pendingCommands.add(new DeleteCompactCommand(id, usedFile));
    }
 
@@ -186,6 +199,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
     * @param usedFile
     */
    public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size);
+      }
       pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
    }
 
@@ -241,6 +257,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
     */
    public void replayPendingCommands() {
       for (CompactCommand command : pendingCommands) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Replay " + command);
+         }
          try {
             command.execute();
          } catch (Exception e) {
@@ -256,6 +275,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
    @Override
    public void onReadAddRecord(final RecordInfo info) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Read Record " + info);
+      }
       if (lookupRecord(info.id)) {
          JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
          addRecord.setCompactCount((short) (info.compactCount + 1));
@@ -270,6 +292,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
    @Override
    public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Read Add Recprd TX " + transactionID + " info " + info);
+      }
       if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -288,6 +313,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    @Override
    public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
 
+      if (logger.isTraceEnabled()) {
+         logger.trace("onReadCommitRecord " + transactionID);
+      }
+
       if (pendingTransactions.get(transactionID) != null) {
          // Sanity check, this should never happen
          ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID);
@@ -307,6 +336,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
    @Override
    public void onReadDeleteRecord(final long recordID) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("onReadDeleteRecord " + recordID);
+      }
+
       if (newRecords.get(recordID) != null) {
          // Sanity check, it should never happen
          ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompactingDelete(recordID);
@@ -316,6 +349,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
    @Override
    public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("onReadDeleteRecordTX " + transactionID + " info " + info);
+      }
+
       if (pendingTransactions.get(transactionID) != null) {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -339,6 +376,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    public void onReadPrepareRecord(final long transactionID,
                                    final byte[] extraData,
                                    final int numberOfRecords) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("onReadPrepareRecord " + transactionID);
+      }
+
       if (pendingTransactions.get(transactionID) != null) {
 
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -356,6 +397,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
    @Override
    public void onReadRollbackRecord(final long transactionID) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("onReadRollbackRecord " + transactionID);
+      }
+
       if (pendingTransactions.get(transactionID) != null) {
          // Sanity check, this should never happen
          throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
@@ -378,6 +423,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
    @Override
    public void onReadUpdateRecord(final RecordInfo info) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("onReadUpdateRecord " + info);
+      }
+
       if (lookupRecord(info.id)) {
          JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
 
@@ -399,6 +448,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
 
    @Override
    public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("onReadUpdateRecordTX " + info);
+      }
+
       if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
          JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
 
@@ -423,8 +476,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
    private JournalTransaction getNewJournalTransaction(final long transactionID) {
       JournalTransaction newTransaction = newTransactions.get(transactionID);
       if (newTransaction == null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("creating new journal Transaction " + transactionID);
+         }
          newTransaction = new JournalTransaction(transactionID, this);
          newTransactions.put(transactionID, newTransaction);
+      } else if (logger.isTraceEnabled()) {
+         // just logging
+         logger.trace("reusing TX " + transactionID);
+
       }
       return newTransaction;
    }
@@ -485,6 +545,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
          JournalRecord updateRecord = journal.getRecords().get(id);
          updateRecord.addUpdateFile(usedFile, size);
       }
+
+      @Override
+      public String toString() {
+         return "UpdateCompactCommand{" +
+            "id=" + id +
+            ", usedFile=" + usedFile +
+            ", size=" + size +
+            '}';
+      }
    }
 
    private class CommitCompactCommand extends CompactCommand {
@@ -510,6 +579,14 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
          }
          newTransactions.remove(liveTransaction.getId());
       }
+
+      @Override
+      public String toString() {
+         return "CommitCompactCommand{" +
+            "commitFile=" + commitFile +
+            ", liveTransaction=" + liveTransaction +
+            '}';
+      }
    }
 
    private class RollbackCompactCommand extends CompactCommand {
@@ -535,6 +612,14 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
          }
          newTransactions.remove(liveTransaction.getId());
       }
+
+      @Override
+      public String toString() {
+         return "RollbackCompactCommand{" +
+            "liveTransaction=" + liveTransaction +
+            ", rollbackFile=" + rollbackFile +
+            '}';
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 24bb916..81ae9c0 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -78,6 +78,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.SimpleFuture;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
 import org.jboss.logging.Logger;
 
 /**
@@ -619,6 +620,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             // At this point everything is checked. So we relax and just load
             // the data now.
 
+            if (logger.isTraceEnabled()) {
+               logger.trace("reading " + recordID + ", userRecordType=" + userRecordType + ", compactCount=" + compactCount);
+            }
+
             switch (recordType) {
                case ADD_RECORD: {
                   reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
@@ -721,6 +726,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       lineUpContext(callback);
       pendingRecords.add(id);
 
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendAddRecord::id=" + id +
+                         ", userRecordType=" +
+                         recordType +
+                         ", record = " + record);
+      }
+
 
       final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
       appendExecutor.execute(new Runnable() {
@@ -740,13 +752,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                              ", usedFile = " +
                                              usedFile);
                }
-               if (result != null) {
-                  result.set(true);
-               }
-            } catch (Exception e) {
-               if (result != null) {
-                  result.fail(e);
-               }
+               result.set(true);
+            } catch (Throwable e) {
+               result.fail(e);
+               setErrorCondition(callback, null, e);
                logger.error("appendAddRecord::"  + e, e);
             } finally {
                pendingRecords.remove(id);
@@ -755,9 +764,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (result != null) {
-         result.get();
-      }
+      result.get();
    }
 
    @Override
@@ -771,6 +778,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       lineUpContext(callback);
       checkKnownRecordID(id);
 
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendUpdateRecord::id=" + id +
+                         ", userRecordType=" +
+                         recordType);
+      }
+
       final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
 
       appendExecutor.execute(new Runnable() {
@@ -798,13 +811,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                   jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
                }
 
-               if (result != null) {
-                  result.set(true);
-               }
+               result.set(true);
             } catch (Exception e) {
-               if (result != null) {
-                  result.fail(e);
-               }
+               result.fail(e);
+               setErrorCondition(callback, null, e);
                logger.error("appendUpdateRecord:" + e, e);
             } finally {
                journalLock.readLock().unlock();
@@ -812,13 +822,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (result != null) {
-         result.get();
-      }
+      result.get();
    }
 
    @Override
    public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendDeleteRecord::id=" + id);
+      }
+
+
       checkJournalIsLoaded();
       lineUpContext(callback);
       checkKnownRecordID(id);
@@ -848,13 +862,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                } else {
                   record.delete(usedFile);
                }
-               if (result != null) {
-                  result.set(true);
-               }
+               result.set(true);
             } catch (Exception e) {
-               if (result != null) {
-                  result.fail(e);
-               }
+               result.fail(e);
                logger.error("appendDeleteRecord:" + e, e);
             } finally {
                journalLock.readLock().unlock();
@@ -862,13 +872,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
-      if (result != null) {
-         result.get();
-      }
+      result.get();
    }
 
-   private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
-      return (sync && callback == null) ? new SimpleFuture<Boolean>() : null;
+   private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
+      return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb();
    }
 
    @Override
@@ -878,16 +886,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                             final Persister persister,
                                             final Object record) throws Exception {
       checkJournalIsLoaded();
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendAddRecordTransactional:txID=" + txID +
+                         ",id=" +
+                         id +
+                         ", userRecordType=" +
+                         recordType +
+                         ", record = " + record);
+      }
 
-      final JournalTransaction tx = getTransactionInfo(txID);
-      tx.checkErrorCondition();
 
       appendExecutor.execute(new Runnable() {
 
          @Override
          public void run() {
             journalLock.readLock().lock();
+
+            final JournalTransaction tx = getTransactionInfo(txID);
+
             try {
+               if (tx != null) {
+                  tx.checkErrorCondition();
+               }
                JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
                JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
 
@@ -905,7 +925,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                tx.addPositive(usedFile, id, addRecord.getEncodeSize());
             } catch (Exception e) {
                logger.error("appendAddRecordTransactional:" + e, e);
-               setErrorCondition(tx, e);
+               setErrorCondition(null, tx, e);
             } finally {
                journalLock.readLock().unlock();
             }
@@ -918,7 +938,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          return;
       }
 
-      final SimpleFuture<Boolean> known = new SimpleFuture<>();
+      final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
 
       // retry on the append thread. maybe the appender thread is not keeping up.
       appendExecutor.execute(new Runnable() {
@@ -957,17 +977,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                                final byte recordType,
                                                final Persister persister,
                                                final Object record) throws Exception {
+      if ( logger.isTraceEnabled() ) {
+         logger.trace( "scheduling appendUpdateRecordTransactional::txID=" + txID +
+                          ",id=" +
+                          id +
+                          ", userRecordType=" +
+                          recordType +
+                          ", record = " + record);
+      }
+
       checkJournalIsLoaded();
 
-      final JournalTransaction tx = getTransactionInfo(txID);
-      tx.checkErrorCondition();
 
       appendExecutor.execute(new Runnable() {
 
          @Override
          public void run() {
             journalLock.readLock().lock();
+
+            final JournalTransaction tx = getTransactionInfo(txID);
+
             try {
+               tx.checkErrorCondition();
 
                JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record );
                JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
@@ -986,7 +1017,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
             } catch ( Exception e ) {
                logger.error("appendUpdateRecordTransactional:" +  e.getMessage(), e );
-               setErrorCondition( tx, e );
+               setErrorCondition(null, tx, e );
             } finally {
                journalLock.readLock().unlock();
             }
@@ -998,16 +1029,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void appendDeleteRecordTransactional(final long txID,
                                                final long id,
                                                final EncodingSupport record) throws Exception {
-      checkJournalIsLoaded();
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendDeleteRecordTransactional::txID=" + txID +
+                         ", id=" +
+                         id);
+      }
+
 
-      final JournalTransaction tx = getTransactionInfo(txID);
-      tx.checkErrorCondition();
+      checkJournalIsLoaded();
 
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
+
+            final JournalTransaction tx = getTransactionInfo(txID);
+
             try {
+               if (tx != null) {
+                  tx.checkErrorCondition();
+               }
 
                JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
                JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
@@ -1023,7 +1064,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                tx.addNegative(usedFile, id);
             } catch (Exception e) {
                logger.error("appendDeleteRecordTransactional:" + e, e);
-               setErrorCondition(tx, e);
+               setErrorCondition(null, tx, e);
             } finally {
                journalLock.readLock().unlock();
             }
@@ -1050,16 +1091,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       checkJournalIsLoaded();
       lineUpContext(callback);
 
-      final JournalTransaction tx = getTransactionInfo(txID);
-      tx.checkErrorCondition();
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendPrepareRecord::txID=" + txID);
+      }
 
-      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+      final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
 
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
+
+
+            final JournalTransaction tx = getTransactionInfo(txID);
+
             try {
+               tx.checkErrorCondition();
                JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
                JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
 
@@ -1068,23 +1115,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                }
 
                tx.prepare(usedFile);
-               if (result != null) {
-                  result.set(true);
-               }
             } catch (Exception e) {
-               if (result != null) {
-                  result.fail(e);
-               }
+               result.fail(e);
                logger.error("appendPrepareRecord:" + e, e);
-               setErrorCondition(tx, e);
+               setErrorCondition(callback, tx, e);
             } finally {
                journalLock.readLock().unlock();
+               result.set(tx);
             }
          }
       });
 
-      if (result != null) {
-         result.get();
+      JournalTransaction tx = result.get();
+      if (tx != null) {
          tx.checkErrorCondition();
       }
    }
@@ -1096,12 +1139,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
    }
 
-   private void setErrorCondition(JournalTransaction jt, Throwable t) {
+   private void setErrorCondition(IOCallback otherCallback, JournalTransaction jt, Throwable t) {
+      TransactionCallback callback = null;
       if (jt != null) {
-         TransactionCallback callback = jt.getCurrentCallback();
+         callback = jt.getCurrentCallback();
          if (callback != null && callback.getErrorMessage() != null) {
             callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
          }
+
+      }
+
+      if (otherCallback != null && otherCallback != callback) {
+         otherCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
       }
    }
 
@@ -1118,46 +1167,49 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          lineUpContext(callback);
       }
 
-      final JournalTransaction tx = transactions.remove(txID);
 
-      if (tx == null) {
-         throw new IllegalStateException("Cannot find tx with id " + txID);
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendCommitRecord::txID=" + txID );
       }
 
-      tx.checkErrorCondition();
-      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+      JournalTransaction txcheck = transactions.get(txID);
+      if (txcheck != null) {
+         txcheck.checkErrorCondition();
+      }
+
+
+      final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
 
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
+            // cannot remove otherwise compact may get lost
+            final JournalTransaction tx = transactions.remove(txID);
+
             try {
+               if (tx == null) {
+                  throw new IllegalStateException("Cannot find tx with id " + txID);
+               }
+
                JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
                JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
 
 
-               if (logger.isTraceEnabled()) {
-                  logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
-               }
-
                tx.commit(usedFile);
-               if (result != null) {
-                  result.set(true);
-               }
-            } catch (Exception e) {
-               if (result != null) {
-                  result.fail(e);
-               }
+            } catch (Throwable e) {
+               result.fail(e);
                logger.error("appendCommitRecord:" + e, e);
-               setErrorCondition(tx, e);
+               setErrorCondition(callback, tx, e);
             } finally {
                journalLock.readLock().unlock();
+               result.set(tx);
             }
          }
       });
 
-      if (result != null) {
-         result.get();
+      JournalTransaction tx = result.get();
+      if (tx != null) {
          tx.checkErrorCondition();
       }
    }
@@ -1167,40 +1219,47 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       checkJournalIsLoaded();
       lineUpContext(callback);
 
-      final JournalTransaction tx = transactions.remove(txID);
 
-      if (tx == null) {
-         throw new IllegalStateException("Cannot find tx with id " + txID);
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendRollbackRecord::txID=" + txID );
       }
 
-      tx.checkErrorCondition();
-      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
+
+
+      final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
             journalLock.readLock().lock();
+
+            final JournalTransaction tx = transactions.remove(txID);
             try {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendRollbackRecord::txID=" + txID );
+               }
+
+               if (tx == null) {
+                  throw new IllegalStateException("Cannot find tx with id " + txID);
+               }
+
+
                JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
                JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
 
                tx.rollback(usedFile);
-               if (result != null) {
-                  result.set(true);
-               }
-            } catch (Exception e) {
-               if (result != null) {
-                  result.fail(e);
-               }
+            } catch (Throwable e) {
+               result.fail(e);
                logger.error("appendRollbackRecord:" + e, e);
-               setErrorCondition(tx, e);
+               setErrorCondition(callback, tx, e);
             }  finally {
                journalLock.readLock().unlock();
+               result.set(tx);
             }
          }
       });
 
-      if (result != null) {
-         result.get();
+      JournalTransaction tx = result.get();
+      if (tx != null) {
          tx.checkErrorCondition();
       }
    }
@@ -1545,6 +1604,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       } finally {
          compactorLock.writeLock().unlock();
+         if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
+            ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact finishing");
+         }
+
+
       }
 
    }
@@ -2544,7 +2608,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             }
             callback = txcallback;
          } else {
-            callback = null;
+            callback = parameterCallback;
          }
 
          // We need to add the number of records on currentFile if prepare or commit
@@ -2591,19 +2655,24 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    }
 
    private JournalTransaction getTransactionInfo(final long txID) {
-      JournalTransaction tx = transactions.get(txID);
+      journalLock.readLock().lock();
+      try {
+         JournalTransaction tx = transactions.get(txID);
 
-      if (tx == null) {
-         tx = new JournalTransaction(txID, this);
+         if (tx == null) {
+            tx = new JournalTransaction(txID, this);
 
-         JournalTransaction trans = transactions.putIfAbsent(txID, tx);
+            JournalTransaction trans = transactions.putIfAbsent(txID, tx);
 
-         if (trans != null) {
-            tx = trans;
+            if (trans != null) {
+               tx = trans;
+            }
          }
-      }
 
-      return tx;
+         return tx;
+      } finally {
+         journalLock.readLock().unlock();
+      }
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index 8e40f3b..36d585a 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -28,9 +28,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
+import org.jboss.logging.Logger;
 
 public class JournalTransaction {
 
+   private static final Logger logger = Logger.getLogger(JournalTransaction.class);
+
    private JournalRecordProvider journal;
 
    private List<JournalUpdate> pos;
@@ -229,10 +232,17 @@ public class JournalTransaction {
    public void commit(final JournalFile file) {
       JournalCompactor compactor = journal.getCompactor();
 
+      // The race lies here....
       if (compacting && compactor != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("adding tx " + this.id + " into compacting");
+         }
          compactor.addCommandCommit(this, file);
       } else {
 
+         if (logger.isTraceEnabled()) {
+            logger.trace("no compact commit " + this.id);
+         }
          if (pos != null) {
             for (JournalUpdate trUpdate : pos) {
                JournalRecord posFiles = journal.getRecords().get(trUpdate.id);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index a0f23d0..14f3393 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -53,12 +54,15 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.jboss.logging.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class NIOJournalCompactTest extends JournalImplTestBase {
 
+   private static final Logger logger = Logger.getLogger(NIOJournalCompactTest.class);
+
    private static final int NUMBER_OF_RECORDS = 1000;
 
    IDGenerator idGenerator = new SimpleIDGenerator(100000);
@@ -783,6 +787,97 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
    }
 
    @Test
+   public void testLoopStressAppends() throws Exception {
+      for (int i = 0; i < 10; i++) {
+         logger.info("repetition " + i);
+         testStressAppends();
+         tearDown();
+         setUp();
+      }
+   }
+
+   @Test
+   public void testStressAppends() throws Exception {
+      setup(2, 60 * 1024, true);
+
+      final int NUMBER_OF_RECORDS = 200;
+
+      SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+      createJournal();
+      journal.setAutoReclaim(false);
+
+      startJournal();
+      load();
+
+      AtomicBoolean running = new AtomicBoolean(true);
+      Thread t = new Thread() {
+         @Override
+         public void run() {
+            while (running.get()) {
+               journal.testCompact();
+            }
+         }
+      };
+      t.start();
+
+
+      for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
+         long tx = idGen.generateID();
+         addTx(tx, idGen.generateID());
+         LockSupport.parkNanos(1000);
+         commit(tx);
+      }
+
+
+      running.set(false);
+
+      t.join(50000);
+      if (t.isAlive()) {
+         t.interrupt();
+         Assert.fail("supposed to join thread");
+      }
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   @Test
+   public void testSimpleCommitCompactInBetween() throws Exception {
+      setup(2, 60 * 1024, false);
+
+      final int NUMBER_OF_RECORDS = 1;
+
+      SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+      createJournal();
+      journal.setAutoReclaim(false);
+
+      startJournal();
+      load();
+
+
+      for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
+         long tx = idGen.generateID();
+         addTx(tx, idGen.generateID());
+         journal.testCompact();
+         journal.testCompact();
+         journal.testCompact();
+         journal.testCompact();
+         logger.info("going to commit");
+         commit(tx);
+      }
+
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   @Test
    public void testCompactAddAndUpdateFollowedByADelete2() throws Exception {
 
       setup(2, 60 * 1024, false);
@@ -917,8 +1012,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
       journal.testCompact();
 
-      System.out.println("Debug after compact\n" + journal.debug());
-
       stopJournal();
       createJournal();
       startJournal();
@@ -1666,10 +1759,14 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
                      survivingMsgs.add(message.getMessageID());
 
+                     logger.info("Going to store " + message);
                      // This one will stay here forever
                      storage.storeMessage(message);
+                     logger.info("message storeed " + message);
 
+                     logger.info("Going to commit " + tx);
                      storage.commit(tx);
+                     logger.info("Commited " + tx);
 
                      ctx.executeOnCompletion(new IOCallback() {
                         @Override
@@ -1749,6 +1846,8 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
          assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS));
 
+         Assert.assertEquals(0, errors.get());
+
       } catch (Throwable e) {
          e.printStackTrace();
          throw e;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index be6e5b3..b85be80 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -371,7 +371,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
       Assert.assertEquals(0, transactions.size());
 
       try {
-         journalImpl.appendCommitRecord(1L, false);
+         journalImpl.appendCommitRecord(1L, true);
          // This was supposed to throw an exception, as the transaction was
          // forgotten (interrupted by a reload).
          Assert.fail("Supposed to throw exception");
@@ -419,7 +419,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
       Assert.assertEquals((Long) 78L, incompleteTransactions.get(1));
 
       try {
-         journalImpl.appendCommitRecord(77L, false);
+         journalImpl.appendCommitRecord(77L, true);
          // This was supposed to throw an exception, as the transaction was
          // forgotten (interrupted by a reload).
          Assert.fail("Supposed to throw exception");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
index 204600e..00539a7 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
@@ -138,6 +138,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
 
       try {
          journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0));
+         journalImpl.appendCommitRecord(1L, true);
          Assert.fail("Exception expected");
          // An exception already happened in one of the elements on this transaction.
          // We can't accept any more elements on the transaction

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ddacda50/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
index 3d70b1a..e5650cb 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -36,12 +37,15 @@ import org.apache.activemq.artemis.core.journal.TestableJournal;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.logging.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 
 public abstract class JournalImplTestBase extends ActiveMQTestBase {
 
+   private static final Logger logger = Logger.getLogger(JournalImplTestBase.class);
+
    protected List<RecordInfo> records = new LinkedList<>();
 
    protected TestableJournal journal;
@@ -156,13 +160,11 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
          @Override
          public void onCompactDone() {
             latchDone.countDown();
-            System.out.println("Waiting on Compact");
             try {
                latchWait.await();
             } catch (InterruptedException e) {
                e.printStackTrace();
             }
-            System.out.println("Waiting on Compact Done");
          }
       };
 
@@ -520,19 +522,31 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
     * @param actual
     */
    protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual) {
-      System.out.println("***********************************************");
-      System.out.println("Expected list:");
-      for (RecordInfo info : expected) {
-         System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
-      }
-      if (actual != null) {
-         System.out.println("***********************************************");
-         System.out.println("Actual list:");
-         for (RecordInfo info : actual) {
-            System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
-         }
+
+      HashSet<RecordInfo> expectedSet = new HashSet<>();
+      expectedSet.addAll(expected);
+
+
+      Assert.assertEquals("There are duplicated on the expected list", expectedSet.size(), expected.size());
+
+      HashSet<RecordInfo> actualSet = new HashSet<>();
+      actualSet.addAll(actual);
+
+      expectedSet.removeAll(actualSet);
+
+      for (RecordInfo info: expectedSet) {
+         logger.warn("The following record is missing:: " + info);
       }
-      System.out.println("***********************************************");
+
+
+      Assert.assertEquals("There are duplicates on the actual list", actualSet.size(), actualSet.size());
+
+
+
+      RecordInfo[] expectedArray = expected.toArray(new RecordInfo[expected.size()]);
+      RecordInfo[] actualArray = actual.toArray(new RecordInfo[actual.size()]);
+      Assert.assertArrayEquals(expectedArray, actualArray);
+
    }
 
    protected byte[] generateRecord(final int length) {

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] activemq-artemis git commit: This closes #1205

clebertsuconic-2
This closes #1205


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/09958aa5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/09958aa5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/09958aa5

Branch: refs/heads/master
Commit: 09958aa540d3e8d8f431acc2c9b21a86c346c453
Parents: ec161fc ddacda5
Author: Clebert Suconic <[hidden email]>
Authored: Fri Apr 14 01:17:06 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Apr 14 01:17:06 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/utils/SimpleFuture.java    |  74 +++---
 .../artemis/utils/SimpleFutureImpl.java         |  81 ++++++
 .../artemis/utils/SimpleFutureTest.java         |   4 +-
 .../artemis/core/journal/impl/JournalBase.java  |   9 +-
 .../core/journal/impl/JournalCompactor.java     |  85 ++++++
 .../artemis/core/journal/impl/JournalImpl.java  | 265 ++++++++++++-------
 .../core/journal/impl/JournalTransaction.java   |  10 +
 .../journal/NIOJournalCompactTest.java          | 103 ++++++-
 .../journal/impl/AlignedJournalImplTest.java    |   4 +-
 .../core/journal/impl/JournalAsyncTest.java     |   1 +
 .../core/journal/impl/JournalImplTestBase.java  |  42 ++-
 11 files changed, 513 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


Loading...