[1/2] activemq-artemis git commit: ARTEMIS-1328 Improving direct delivery check

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] activemq-artemis git commit: ARTEMIS-1328 Improving direct delivery check

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 9672dc23e -> 0c5962f1b


ARTEMIS-1328 Improving direct delivery check

Based on #1447 as it is not possible to cherry-pick here


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/25c0f93a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/25c0f93a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/25c0f93a

Branch: refs/heads/1.x
Commit: 25c0f93ad50bc82f93cc2a6785203cb1ea366c40
Parents: 9672dc2
Author: Clebert Suconic <[hidden email]>
Authored: Mon Aug 7 23:48:29 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Aug 8 13:00:37 2017 -0400

----------------------------------------------------------------------
 .../artemis/utils/OrderedExecutorFactory.java   |  6 ++-
 .../artemis/core/server/impl/QueueImpl.java     | 56 +++++++++++++-------
 2 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25c0f93a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
index f4c85f3..10aa7f6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -75,7 +75,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
     * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
     * same method, will result in B's task running after A's.
     */
-   private static class OrderedExecutor implements Executor {
+   public static class OrderedExecutor implements Executor {
 
       private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
       private final Executor delegate;
@@ -104,6 +104,10 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
          }
       }
 
+      public boolean isFlushed() {
+         return stateUpdater.get(this) == STATE_NOT_RUNNING;
+      }
+
       private final class ExecutorTask implements Runnable {
 
          @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25c0f93a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 897fde3..8e338c0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -85,6 +85,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
@@ -107,7 +108,7 @@ public class QueueImpl implements Queue {
 
    public static final int MAX_DELIVERIES_IN_LOOP = 1000;
 
-   public static final int CHECK_QUEUE_SIZE_PERIOD = 100;
+   public static final int CHECK_QUEUE_SIZE_PERIOD = 1000;
 
    /**
     * If The system gets slow for any reason, this is the maximum time a Delivery or
@@ -534,24 +535,27 @@ public class QueueImpl implements Queue {
          return;
       }
 
-      synchronized (directDeliveryGuard) {
-         // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
-         // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
-         // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
-         if (!directDeliver &&
-            direct &&
-            System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
-            lastDirectDeliveryCheck = System.currentTimeMillis();
+      if (!directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Checking to re-enable direct deliver on queue " + this.getName());
+         }
+         lastDirectDeliveryCheck = System.currentTimeMillis();
+         synchronized (directDeliveryGuard) {
+            // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
+            // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
+            // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
 
-            if (intermediateMessageReferences.isEmpty() &&
-               messageReferences.isEmpty() &&
-               !pageIterator.hasNext() &&
-               !pageSubscription.isPaging()) {
+            if (deliveriesInTransit.getCount() == 0 && isFlushed(getExecutor()) && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) {
                // We must block on the executor to ensure any async deliveries have completed or we might get out of order
                // deliveries
-               if (flushExecutor() && flushDeliveriesInTransit()) {
-                  // Go into direct delivery mode
-                  directDeliver = true;
+               // Go into direct delivery mode
+               directDeliver = true;
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Setting direct deliverer to true");
+               }
+            } else {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Couldn't set direct deliver back");
                }
             }
          }
@@ -671,9 +675,23 @@ public class QueueImpl implements Queue {
       flushExecutor();
    }
 
+   private boolean isFlushed(Executor executor) {
+      if (executor instanceof OrderedExecutorFactory.OrderedExecutor) {
+         return ((OrderedExecutorFactory.OrderedExecutor)executor).isFlushed();
+      } else {
+         CountDownLatch latch = new CountDownLatch(1);
+         executor.execute(latch::countDown);
+         try {
+            return latch.await(100, TimeUnit.MILLISECONDS);
+         } catch (InterruptedException e) {
+            return false;
+         }
+      }
+   }
+
    @Override
    public boolean flushExecutor() {
-      boolean ok = internalFlushExecutor(10000);
+      boolean ok = internalFlushExecutor(10000, true);
 
       if (!ok) {
          ActiveMQServerLogger.LOGGER.errorFlushingExecutorsOnQueue();
@@ -682,14 +700,14 @@ public class QueueImpl implements Queue {
       return ok;
    }
 
-   private boolean internalFlushExecutor(long timeout) {
+   private boolean internalFlushExecutor(long timeout, boolean log) {
       FutureLatch future = new FutureLatch();
 
       getExecutor().execute(future);
 
       boolean result = future.await(timeout);
 
-      if (!result) {
+      if (log && !result) {
          ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout);
       }
       return result;

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] activemq-artemis git commit: This closes #1448

clebertsuconic-2
This closes #1448


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0c5962f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0c5962f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0c5962f1

Branch: refs/heads/1.x
Commit: 0c5962f1b8ce54f98b13498b64f849b1e5762204
Parents: 9672dc2 25c0f93
Author: Clebert Suconic <[hidden email]>
Authored: Tue Aug 8 14:01:36 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Aug 8 14:01:36 2017 -0400

----------------------------------------------------------------------
 .../artemis/utils/OrderedExecutorFactory.java   |  6 ++-
 .../artemis/core/server/impl/QueueImpl.java     | 56 +++++++++++++-------
 2 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


Loading...