[1/2] activemq-artemis git commit: ARTEMIS-1328 Improving direct delivery check

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] activemq-artemis git commit: ARTEMIS-1328 Improving direct delivery check

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 246b8ef77 -> d4a7aebb6


ARTEMIS-1328 Improving direct delivery check

Instead of wait to flush an executor,
I have added a method isFlushed() which will just translate to the
state on the OrderedExecutor.

In the case another executor is provided (for tests) there's a delegate
into normal executors.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1ace3061
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1ace3061
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1ace3061

Branch: refs/heads/master
Commit: 1ace3061217340e4d5dae67d75532ec48efe32fb
Parents: 246b8ef
Author: Clebert Suconic <[hidden email]>
Authored: Mon Aug 7 23:48:29 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Aug 8 14:00:58 2017 -0400

----------------------------------------------------------------------
 .../artemis/cli/commands/tools/PrintData.java   |  6 +-
 .../cli/commands/tools/xml/XmlDataExporter.java |  6 +-
 .../activemq/artemis/utils/ExecutorFactory.java |  4 +-
 .../artemis/utils/actors/ArtemisExecutor.java   | 69 ++++++++++++++++++++
 .../artemis/utils/actors/OrderedExecutor.java   |  2 +-
 .../utils/actors/OrderedExecutorFactory.java    |  2 +-
 .../artemis/utils/actors/ProcessorBase.java     |  4 ++
 .../artemis/core/io/JournalTptBenchmark.java    |  4 +-
 .../artemis/core/paging/PagingStoreFactory.java |  3 +-
 .../core/paging/cursor/PageSubscription.java    |  5 +-
 .../cursor/impl/PageCursorProviderImpl.java     |  6 +-
 .../cursor/impl/PageSubscriptionImpl.java       |  8 +--
 .../paging/impl/PagingStoreFactoryDatabase.java |  3 +-
 .../core/paging/impl/PagingStoreFactoryNIO.java |  3 +-
 .../core/paging/impl/PagingStoreImpl.java       |  6 +-
 .../core/server/impl/LastValueQueue.java        |  6 +-
 .../artemis/core/server/impl/QueueImpl.java     | 58 +++++++++-------
 .../integration/client/HangConsumerTest.java    |  7 +-
 .../client/InterruptedLargeMessageTest.java     | 15 ++---
 .../tests/integration/paging/PagingTest.java    |  6 +-
 .../timing/core/server/impl/QueueImplTest.java  |  9 ++-
 .../core/paging/impl/PagingStoreImplTest.java   |  8 +--
 .../unit/core/server/impl/QueueImplTest.java    |  3 +-
 .../server/impl/fakes/FakeQueueFactory.java     |  5 +-
 24 files changed, 167 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index 20cbfae..9a93c27 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -59,6 +58,7 @@ import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectReposito
 import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
 public class PrintData extends OptionalLocking {
@@ -146,8 +146,8 @@ public class PrintData extends OptionalLocking {
          final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
          ExecutorFactory execfactory = new ExecutorFactory() {
             @Override
-            public Executor getExecutor() {
-               return executor;
+            public ArtemisExecutor getExecutor() {
+               return ArtemisExecutor.delegate(executor);
             }
          };
          final StorageManager sm = new NullStorageManager();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
index d37c965..f297a76 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
@@ -31,7 +31,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -82,6 +81,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 
@@ -387,8 +387,8 @@ public final class XmlDataExporter extends OptionalLocking {
          final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
          ExecutorFactory executorFactory = new ExecutorFactory() {
             @Override
-            public Executor getExecutor() {
-               return executor;
+            public ArtemisExecutor getExecutor() {
+               return ArtemisExecutor.delegate(executor);
             }
          };
          PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduled, executorFactory, true, null);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
index dd0209b..7fe33f0 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.artemis.utils;
 
-import java.util.concurrent.Executor;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 public interface ExecutorFactory {
 
-   Executor getExecutor();
+   ArtemisExecutor getExecutor();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
new file mode 100644
index 0000000..d3036ec
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+public interface ArtemisExecutor extends Executor {
+
+   /**
+    * Artemis is supposed to implement this properly, however in tests or tools
+    * this can be used as a fake, doing a sipmle delegate and using the default methods implemented here.
+    * @param executor
+    * @return
+    */
+   static ArtemisExecutor delegate(Executor executor) {
+      return new ArtemisExecutor() {
+         @Override
+         public void execute(Runnable command) {
+            executor.execute(command);
+         }
+      };
+   }
+
+   default boolean flush() {
+      return flush(30, TimeUnit.SECONDS);
+   }
+
+   default boolean flush(long timeout, TimeUnit unit) {
+      CountDownLatch latch = new CountDownLatch(1);
+      Runnable runnable = new Runnable() {
+         @Override
+         public void run() {
+            latch.countDown();
+         }
+      };
+      execute(runnable);
+      try {
+         return latch.await(timeout, unit);
+      } catch (InterruptedException e) {
+         return false;
+      }
+   }
+
+   /**
+    * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
+    * @return
+    */
+   default boolean isFlushed() {
+      return flush(100, TimeUnit.MILLISECONDS);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java
index 6f0ee9a..8a02497 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java
@@ -28,7 +28,7 @@ import org.jboss.logging.Logger;
  * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
  * same method, will result in B's task running after A's.
  */
-public class OrderedExecutor extends ProcessorBase<Runnable> implements Executor {
+public class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
 
    public OrderedExecutor(Executor delegate) {
       super(delegate);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java
index da61f3d..07e1e7b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java
@@ -57,7 +57,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
     * @return an ordered executor
     */
    @Override
-   public Executor getExecutor() {
+   public ArtemisExecutor getExecutor() {
       return new OrderedExecutor(parent);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 07ed9e9..fcd197d 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -100,6 +100,10 @@ public abstract class ProcessorBase<T> {
       return stateUpdater.get(this) == STATE_NOT_RUNNING;
    }
 
+   public final boolean isFlushed() {
+      return stateUpdater.get(this) == STATE_NOT_RUNNING;
+   }
+
    protected void task(T command) {
       tasks.add(command);
       startPoller();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
index d0bada8..6c44296 100644
--- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.io;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -38,6 +37,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.jlibaio.LibaioContext;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 /**
  * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
@@ -91,7 +91,7 @@ public class JournalTptBenchmark {
       } else {
          final ArrayList<MpscArrayQueue<Runnable>> tasks = new ArrayList<>();
          service = Executors.newSingleThreadExecutor();
-         journal = new JournalImpl(() -> new Executor() {
+         journal = new JournalImpl(() -> new ArtemisExecutor() {
 
             private final MpscArrayQueue<Runnable> taskQueue = new MpscArrayQueue<>(1024);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
index 75799d2..6cfaf20 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 /**
  * The integration point between the PagingManger and the File System (aka SequentialFiles)
@@ -38,7 +39,7 @@ public interface PagingStoreFactory {
    PageCursorProvider newCursorProvider(PagingStore store,
                                         StorageManager storageManager,
                                         AddressSettings addressSettings,
-                                        Executor executor);
+                                        ArtemisExecutor executor);
 
    void stop() throws InterruptedException;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index cec7f52..985f563 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -16,13 +16,12 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
-import java.util.concurrent.Executor;
-
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 
 public interface PageSubscription {
@@ -155,7 +154,7 @@ public interface PageSubscription {
    /**
     * @return executor used by the PageSubscription
     */
-   Executor getExecutor();
+   ArtemisExecutor getExecutor();
 
    /**
     * @param deletedPage

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 701f86c..c1e1761 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -41,6 +40,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.SoftValueHashMap;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.jboss.logging.Logger;
 
 /**
@@ -68,7 +68,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    protected final StorageManager storageManager;
 
    // This is the same executor used at the PageStoreImpl. One Executor per pageStore
-   private final Executor executor;
+   private final ArtemisExecutor executor;
 
    private final SoftValueHashMap<Long, PageCache> softCache;
 
@@ -80,7 +80,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
    public PageCursorProviderImpl(final PagingStore pagingStore,
                                  final StorageManager storageManager,
-                                 final Executor executor,
+                                 final ArtemisExecutor executor,
                                  final int maxCacheSize) {
       this.pagingStore = pagingStore;
       this.storageManager = storageManager;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index f045151..fa2c748 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -28,7 +28,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -56,6 +55,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.FutureLatch;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.jboss.logging.Logger;
 
@@ -92,14 +92,14 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    private final PageSubscriptionCounter counter;
 
-   private final Executor executor;
+   private final ArtemisExecutor executor;
 
    private final AtomicLong deliveredCount = new AtomicLong(0);
 
    PageSubscriptionImpl(final PageCursorProvider cursorProvider,
                         final PagingStore pageStore,
                         final StorageManager store,
-                        final Executor executor,
+                        final ArtemisExecutor executor,
                         final Filter filter,
                         final long cursorId,
                         final boolean persistent) {
@@ -743,7 +743,7 @@ final class PageSubscriptionImpl implements PageSubscription {
    }
 
    @Override
-   public Executor getExecutor() {
+   public ArtemisExecutor getExecutor() {
       return executor;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 2daa89e..444c59c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -45,6 +45,7 @@ import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriv
 import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 /**
  * Integration point between Paging and JDBC
@@ -153,7 +154,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
    public PageCursorProvider newCursorProvider(PagingStore store,
                                                StorageManager storageManager,
                                                AddressSettings addressSettings,
-                                               Executor executor) {
+                                               ArtemisExecutor executor) {
       return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index c65b913..b2e3d4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 /**
  * Integration point between Paging and NIO
@@ -115,7 +116,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
    public PageCursorProvider newCursorProvider(PagingStore store,
                                                StorageManager storageManager,
                                                AddressSettings addressSettings,
-                                               Executor executor) {
+                                               ArtemisExecutor executor) {
       return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index ad9e218..6f85aa2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +61,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.utils.FutureLatch;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.jboss.logging.Logger;
 
 /**
@@ -101,7 +101,7 @@ public class PagingStoreImpl implements PagingStore {
 
    private final boolean usingGlobalMaxSize;
 
-   private final Executor executor;
+   private final ArtemisExecutor executor;
 
    // Bytes consumed by the queue on the memory
    private final AtomicLong sizeInBytes = new AtomicLong();
@@ -137,7 +137,7 @@ public class PagingStoreImpl implements PagingStore {
                           final PagingStoreFactory storeFactory,
                           final SimpleString storeName,
                           final AddressSettings addressSettings,
-                          final Executor executor,
+                          final ArtemisExecutor executor,
                           final boolean syncNonTransactional) {
       if (pagingManager == null) {
          throw new IllegalStateException("Paging Manager can't be null");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 192f25c..5951a01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -29,13 +29,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 /**
  * A queue that will discard messages if a newer message with the same
@@ -65,7 +65,7 @@ public class LastValueQueue extends QueueImpl {
                          final PostOffice postOffice,
                          final StorageManager storageManager,
                          final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                         final Executor executor,
+                         final ArtemisExecutor executor,
                          final ActiveMQServer server,
                          final QueueFactory factory) {
       super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ecfbf09..35dd5ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -32,7 +32,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -89,6 +88,7 @@ import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
@@ -118,7 +118,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    public static final int MAX_DELIVERIES_IN_LOOP = 1000;
 
-   public static final int CHECK_QUEUE_SIZE_PERIOD = 100;
+   public static final int CHECK_QUEUE_SIZE_PERIOD = 1000;
 
    /**
     * If The system gets slow for any reason, this is the maximum time a Delivery or
@@ -228,7 +228,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private int pos;
 
-   private final Executor executor;
+   private final ArtemisExecutor executor;
 
    private boolean internalQueue;
 
@@ -342,7 +342,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor,
+                    final ArtemisExecutor executor,
                     final ActiveMQServer server,
                     final QueueFactory factory) {
       this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
@@ -361,7 +361,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor,
+                    final ArtemisExecutor executor,
                     final ActiveMQServer server,
                     final QueueFactory factory) {
       this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
@@ -383,7 +383,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                     final PostOffice postOffice,
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                    final Executor executor,
+                    final ArtemisExecutor executor,
                     final ActiveMQServer server,
                     final QueueFactory factory) {
       super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);
@@ -654,19 +654,27 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             return;
          }
 
-         synchronized (directDeliveryGuard) {
-            // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
-            // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
-            // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
-            if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
-               lastDirectDeliveryCheck = System.currentTimeMillis();
+         if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Checking to re-enable direct deliver on queue " + this.getName());
+            }
+            lastDirectDeliveryCheck = System.currentTimeMillis();
+            synchronized (directDeliveryGuard) {
+               // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
+               // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
+               // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
 
-               if (intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) {
+               if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) {
                   // We must block on the executor to ensure any async deliveries have completed or we might get out of order
                   // deliveries
-                  if (flushExecutor() && flushDeliveriesInTransit()) {
-                     // Go into direct delivery mode
-                     directDeliver = supportsDirectDeliver;
+                  // Go into direct delivery mode
+                  directDeliver = supportsDirectDeliver;
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Setting direct deliverer to " + supportsDirectDeliver);
+                  }
+               } else {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Couldn't set direct deliver back");
                   }
                }
             }
@@ -773,7 +781,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public Executor getExecutor() {
+   public ArtemisExecutor getExecutor() {
       if (pageSubscription != null && pageSubscription.isPaging()) {
          // When in page mode, we don't want to have concurrent IO on the same PageStore
          return pageSubscription.getExecutor();
@@ -791,7 +799,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public boolean flushExecutor() {
-      boolean ok = internalFlushExecutor(10000);
+      boolean ok = internalFlushExecutor(10000, true);
 
       if (!ok) {
          ActiveMQServerLogger.LOGGER.errorFlushingExecutorsOnQueue();
@@ -800,14 +808,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return ok;
    }
 
-   private boolean internalFlushExecutor(long timeout) {
+   private boolean internalFlushExecutor(long timeout, boolean log) {
       FutureLatch future = new FutureLatch();
 
       getExecutor().execute(future);
 
       boolean result = future.await(timeout);
 
-      if (!result) {
+      if (log && !result) {
          ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout);
       }
       return result;
@@ -2344,7 +2352,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private void internalAddRedistributor(final Executor executor) {
+   private void internalAddRedistributor(final ArtemisExecutor executor) {
       // create the redistributor only once if there are no local consumers
       if (consumerSet.isEmpty() && redistributor == null) {
          if (logger.isTraceEnabled()) {
@@ -2745,9 +2753,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private void proceedDeliver(Consumer consumer, MessageReference reference) {
       try {
          consumer.proceedDeliver(reference);
-         deliveriesInTransit.countDown();
       } catch (Throwable t) {
-         deliveriesInTransit.countDown();
          ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
 
          synchronized (this) {
@@ -2761,6 +2767,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             // The message failed to be delivered, hence we try again
             addHead(reference, false);
          }
+      } finally {
+         deliveriesInTransit.countDown();
       }
    }
 
@@ -2949,9 +2957,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private class DelayedAddRedistributor implements Runnable {
 
-      private final Executor executor1;
+      private final ArtemisExecutor executor1;
 
-      DelayedAddRedistributor(final Executor executor) {
+      DelayedAddRedistributor(final ArtemisExecutor executor) {
          this.executor1 = executor;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index afe1403..40ccefc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -16,17 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import javax.management.MBeanServer;
 import java.lang.management.ManagementFactory;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import javax.management.MBeanServer;
-
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Message;
@@ -74,6 +72,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -236,7 +235,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
                              final PostOffice postOffice,
                              final StorageManager storageManager,
                              final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                             final Executor executor, final ActiveMQServer server) {
+                             final ArtemisExecutor executor, final ActiveMQServer server) {
             super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
                   maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
                   addressSettingsRepository, executor, server, null);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 66381fa..6056fcb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -16,19 +16,17 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
@@ -64,6 +62,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -518,7 +517,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
                         PostOffice postOffice,
                         StorageManager storageManager,
                         HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                        Executor executor) {
+                        ArtemisExecutor executor) {
             super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
                   postOffice, storageManager, addressSettingsRepository, executor, null, null);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 7eadeca..542d94d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -32,7 +32,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -86,6 +85,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.jboss.logging.Logger;
 import org.junit.After;
 import org.junit.Assert;
@@ -3056,7 +3056,7 @@ public class PagingTest extends ActiveMQTestBase {
 
          InterruptedCursorProvider(PagingStore pagingStore,
                                    StorageManager storageManager,
-                                   Executor executor,
+                                   ArtemisExecutor executor,
                                    int maxCacheSize) {
             super(pagingStore, storageManager, executor, maxCacheSize);
          }
@@ -3082,7 +3082,7 @@ public class PagingTest extends ActiveMQTestBase {
                public PageCursorProvider newCursorProvider(PagingStore store,
                                                            StorageManager storageManager,
                                                            AddressSettings addressSettings,
-                                                           Executor executor) {
+                                                           ArtemisExecutor executor) {
                   return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
                }
             };

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
index b5dc463..ab36b33 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -70,7 +71,7 @@ public class QueueImplTest extends ActiveMQTestBase {
       QueueImpl queue =
                new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
                              false, scheduledExecutor, null, null, null,
-                             Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
+                             ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
 
       // Send one scheduled
 
@@ -135,7 +136,8 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    @Test
    public void testScheduled() throws Exception {
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
+      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null,
+                                      ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
 
       FakeConsumer consumer = null;
 
@@ -233,7 +235,8 @@ public class QueueImplTest extends ActiveMQTestBase {
          public void disconnect() {
          }
       };
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
+      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null,
+                                      ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.addConsumer(consumer);
       messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 4ddde1c..d261f64 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -67,6 +66,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -743,8 +743,8 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       return new ExecutorFactory() {
 
          @Override
-         public Executor getExecutor() {
-            return executor;
+         public ArtemisExecutor getExecutor() {
+            return ArtemisExecutor.delegate(executor);
          }
       };
    }
@@ -818,7 +818,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       public PageCursorProvider newCursorProvider(PagingStore store,
                                                   StorageManager storageManager,
                                                   AddressSettings addressSettings,
-                                                  Executor executor) {
+                                                  ArtemisExecutor executor) {
          return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 40f9214..0aa6e5c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOff
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.FutureLatch;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.junit.After;
 import org.junit.Assert;
@@ -1310,6 +1311,6 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) {
       return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor,
-                           new FakePostOffice(), null, null, executor, null, null);
+                           new FakePostOffice(), null, null, ArtemisExecutor.delegate(executor), null, null);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ace3061/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
index 4721579..88f0019 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.QueueConfig;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 
 public final class FakeQueueFactory implements QueueFactory {
 
@@ -42,7 +43,7 @@ public final class FakeQueueFactory implements QueueFactory {
    public Queue createQueueWith(final QueueConfig config) {
       return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(),
                            config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
-                           scheduledExecutor, postOffice, null, null, executor, null, this);
+                           scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this);
    }
 
    @Deprecated
@@ -57,7 +58,7 @@ public final class FakeQueueFactory implements QueueFactory {
                             final boolean temporary,
                             final boolean autoCreated) {
       return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated,
-                           scheduledExecutor, postOffice, null, null, executor, null, this);
+                           scheduledExecutor, postOffice, null, null, ArtemisExecutor.delegate(executor), null, this);
    }
 
    @Override

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] activemq-artemis git commit: This closes #1447

clebertsuconic-2
This closes #1447


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d4a7aebb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d4a7aebb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d4a7aebb

Branch: refs/heads/master
Commit: d4a7aebb6ddcf87c1c1455e43d378cdf44b41cde
Parents: 246b8ef 1ace306
Author: Clebert Suconic <[hidden email]>
Authored: Tue Aug 8 14:00:59 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Aug 8 14:00:59 2017 -0400

----------------------------------------------------------------------
 .../artemis/cli/commands/tools/PrintData.java   |  6 +-
 .../cli/commands/tools/xml/XmlDataExporter.java |  6 +-
 .../activemq/artemis/utils/ExecutorFactory.java |  4 +-
 .../artemis/utils/actors/ArtemisExecutor.java   | 69 ++++++++++++++++++++
 .../artemis/utils/actors/OrderedExecutor.java   |  2 +-
 .../utils/actors/OrderedExecutorFactory.java    |  2 +-
 .../artemis/utils/actors/ProcessorBase.java     |  4 ++
 .../artemis/core/io/JournalTptBenchmark.java    |  4 +-
 .../artemis/core/paging/PagingStoreFactory.java |  3 +-
 .../core/paging/cursor/PageSubscription.java    |  5 +-
 .../cursor/impl/PageCursorProviderImpl.java     |  6 +-
 .../cursor/impl/PageSubscriptionImpl.java       |  8 +--
 .../paging/impl/PagingStoreFactoryDatabase.java |  3 +-
 .../core/paging/impl/PagingStoreFactoryNIO.java |  3 +-
 .../core/paging/impl/PagingStoreImpl.java       |  6 +-
 .../core/server/impl/LastValueQueue.java        |  6 +-
 .../artemis/core/server/impl/QueueImpl.java     | 58 +++++++++-------
 .../integration/client/HangConsumerTest.java    |  7 +-
 .../client/InterruptedLargeMessageTest.java     | 15 ++---
 .../tests/integration/paging/PagingTest.java    |  6 +-
 .../timing/core/server/impl/QueueImplTest.java  |  9 ++-
 .../core/paging/impl/PagingStoreImplTest.java   |  8 +--
 .../unit/core/server/impl/QueueImplTest.java    |  3 +-
 .../server/impl/fakes/FakeQueueFactory.java     |  5 +-
 24 files changed, 167 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


Loading...