[1/7] activemq-artemis git commit: This closes #1650

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/7] activemq-artemis git commit: This closes #1650

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master c2a21c974 -> ead60d54d


This closes #1650


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ead60d54
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ead60d54
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ead60d54

Branch: refs/heads/master
Commit: ead60d54d094e470033b65074e85fa143caf2300
Parents: c2a21c9 33b3eb6
Author: Clebert Suconic <[hidden email]>
Authored: Thu Nov 9 11:58:36 2017 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ArtemisExecutor.java   |  44 +++--
 .../artemis/utils/actors/HandlerBase.java       |  47 +++++
 .../artemis/utils/actors/ProcessorBase.java     | 196 +++++++++++++++----
 .../utils/actors/OrderedExecutorSanityTest.java | 148 ++++++++++++++
 .../cursor/impl/PageCursorProviderImpl.java     |   2 +-
 .../core/paging/impl/PagingStoreImpl.java       |   2 +-
 .../core/ServerSessionPacketHandler.java        |  83 +++-----
 .../protocol/core/impl/CoreSessionCallback.java |   2 +-
 .../management/impl/ManagementServiceImpl.java  |   4 +-
 .../artemis/tests/util/ActiveMQTestBase.java    |   2 +-
 .../tests/integration/client/ConsumerTest.java  | 126 +++++++++++-
 .../jms/consumer/JmsConsumerTest.java           |  11 +-
 12 files changed, 542 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


Reply | Threaded
Open this post in threaded view
|

[2/7] activemq-artemis git commit: ARTEMIS-1495 Fixing In Handler executor and added benchmark to measure impact of changes

clebertsuconic-2
ARTEMIS-1495 Fixing In Handler executor and added benchmark to measure impact of changes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/91db0807
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/91db0807
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/91db0807

Branch: refs/heads/master
Commit: 91db08072b221885f246a9db70abf3ee0bdf170d
Parents: 0fadc68
Author: Clebert Suconic <[hidden email]>
Authored: Wed Nov 8 09:16:59 2017 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ArtemisExecutor.java   |  10 +-
 .../artemis/utils/actors/HandlerBase.java       |  47 ++++++
 .../artemis/utils/actors/ProcessorBase.java     | 153 +++++++++++++------
 .../utils/actors/OrderedExecutorSanityTest.java |  69 ++++++++-
 .../core/ServerSessionPacketHandler.java        |  73 +++------
 .../artemis/tests/util/ActiveMQTestBase.java    |   2 +-
 .../tests/integration/client/ConsumerTest.java  |   6 +-
 7 files changed, 260 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 5e72ef2..8efb3d3 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -40,9 +42,15 @@ public interface ArtemisExecutor extends Executor {
 
    /** It will wait the current execution (if there is one) to finish
     *  but will not complete any further executions */
-   default void shutdownNow() {
+   default List<Runnable> shutdownNow() {
+      return Collections.emptyList();
    }
 
+
+   default void shutdown() {
+   }
+
+
    /**
     * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
     * @return

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
new file mode 100644
index 0000000..6bfbcb4
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+/**
+ * This abstract class will encapsulate
+ * ThreadLocals to determine when a class is a handler.
+ * This is because some functionality has to be avoided if inHandler().
+ *
+ */
+public abstract class HandlerBase {
+
+   //marker instance used to recognize if a thread is performing a packet handling
+   private static final Object DUMMY = Boolean.TRUE;
+
+   // this cannot be static as the Actor will be used within another executor. For that reason
+   // each instance will have its own ThreadLocal.
+   // ... a thread that has its thread-local map populated with DUMMY while performing a handler
+   private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
+
+   protected void enter() {
+      assert inHandler.get() == null : "should be null";
+      inHandler.set(DUMMY);
+   }
+
+   public boolean inHandler() {
+      final Object dummy = inHandler.get();
+      return dummy != null;
+   }
+
+   protected void leave() {
+      assert inHandler.get() != null : "marker not set";
+      inHandler.set(null);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 73dbf2f..1c77a52 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -17,17 +17,24 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
 
-public abstract class ProcessorBase<T> {
+import org.jboss.logging.Logger;
 
-   private static final int STATE_NOT_RUNNING = 0;
-   private static final int STATE_RUNNING = 1;
-   private static final int STATE_FORCED_SHUTDOWN = 2;
+public abstract class ProcessorBase<T> extends HandlerBase {
+
+   private static final Logger logger = Logger.getLogger(ProcessorBase.class);
+
+   public static final int STATE_NOT_RUNNING = 0;
+   public static final int STATE_RUNNING = 1;
+   public static final int STATE_FORCED_SHUTDOWN = 2;
 
    protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
 
@@ -41,6 +48,8 @@ public abstract class ProcessorBase<T> {
 
    private volatile boolean requestedShutdown = false;
 
+   private volatile boolean started = true;
+
    private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
    private final class ExecutorTask implements Runnable {
@@ -50,19 +59,23 @@ public abstract class ProcessorBase<T> {
          do {
             //if there is no thread active and is not already dead then we run
             if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
+               enter();
                try {
                   T task = tasks.poll();
                   //while the queue is not empty we process in order
-                  while (task != null) {
+                  while (task != null && !requestedShutdown) {
                      //just drain the tasks if has been requested a shutdown to help the shutdown process
-                     if (!requestedShutdown) {
-                        doTask(task);
+                     if (requestedShutdown) {
+                        tasks.add(task);
+                        break;
                      }
+                     doTask(task);
                      task = tasks.poll();
                   }
                } finally {
+                  leave();
                   //set state back to not running.
-                  stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
+                  stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING);
                }
             } else {
                return;
@@ -75,31 +88,57 @@ public abstract class ProcessorBase<T> {
       }
    }
 
-   /** It will wait the current execution (if there is one) to finish
-    *  but will not complete any further executions */
-   public void shutdownNow() {
+   /**
+    * It will shutdown and wait 30 seconds for timeout.
+    */
+   public void shutdown() {
+      shutdown(30, TimeUnit.SECONDS);
+   }
+
+   public void shutdown(long timeout, TimeUnit unit) {
+      started = false;
+
+      if (!inHandler()) {
+         // if it's in handler.. we just return
+         flush(timeout, unit);
+      }
+   }
+
+   /**
+    * It will wait the current execution (if there is one) to finish
+    * but will not complete any further executions
+    */
+   public List<T> shutdownNow() {
       //alert anyone that has been requested (at least) an immediate shutdown
       requestedShutdown = true;
-      //it could take a very long time depending on the current executing task
-      do {
-         //alert the ExecutorTask (if is running) to just drain the current backlog of tasks
-         final int startState = stateUpdater.get(this);
-         if (startState == STATE_FORCED_SHUTDOWN) {
-            //another thread has completed a forced shutdown
-            return;
-         }
-         if (startState == STATE_RUNNING) {
-            //wait 100 ms to avoid burning CPU while waiting and
-            //give other threads a chance to make progress
-            LockSupport.parkNanos(100_000_000L);
+      started = false;
+
+      if (inHandler()) {
+         stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
+      } else {
+         //it could take a very long time depending on the current executing task
+         do {
+            //alert the ExecutorTask (if is running) to just drain the current backlog of tasks
+            final int startState = stateUpdater.get(this);
+            if (startState == STATE_FORCED_SHUTDOWN) {
+               //another thread has completed a forced shutdown
+               break;
+            }
+            if (startState == STATE_RUNNING) {
+               //wait 100 ms to avoid burning CPU while waiting and
+               //give other threads a chance to make progress
+               LockSupport.parkNanos(100_000_000L);
+            }
          }
+         while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
+         //this could happen just one time: the forced shutdown state is the last one and
+         //can be set by just one caller.
+         //As noted on the execute method there is a small chance that some tasks would be enqueued
       }
-      while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
-      //this could happen just one time: the forced shutdown state is the last one and
-      //can be set by just one caller.
-      //As noted on the execute method there is a small chance that some tasks would be enqueued
+      ArrayList<T> returnList = new ArrayList<>(tasks);
       tasks.clear();
-      //we can report the killed tasks somehow: ExecutorService do the same on shutdownNow
+
+      return returnList;
    }
 
    protected abstract void doTask(T task);
@@ -112,26 +151,48 @@ public abstract class ProcessorBase<T> {
       return stateUpdater.get(this) == STATE_NOT_RUNNING;
    }
 
-   protected void task(T command) {
-      if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) {
-         //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
-         tasks.add(command);
-         //cache locally the state to avoid multiple volatile loads
-         final int state = stateUpdater.get(this);
-         if (state == STATE_FORCED_SHUTDOWN) {
-            //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
-            tasks.clear();
-         } else if (state == STATE_NOT_RUNNING) {
-            //startPoller could be deleted but is maintained because is inherited
-            delegate.execute(task);
+   /**
+    * WARNING: This will only flush when all the activity is suspended.
+    * don't expect success on this call if another thread keeps feeding the queue
+    * this is only valid on situations where you are not feeding the queue,
+    * like in shutdown and failover situations.
+    */
+   public final boolean flush(long timeout, TimeUnit unit) {
+      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+         // quick test, most of the time it will be empty anyways
+         return true;
+      }
+
+      long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
+      try {
+         while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
+
+            if (tasks.isEmpty()) {
+               return true;
+            }
+
+            Thread.sleep(10);
          }
+      } catch (InterruptedException e) {
+         // ignored
       }
+
+      return stateUpdater.get(this) == STATE_NOT_RUNNING;
    }
 
-   protected void startPoller() {
-      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
-         //note that this can result in multiple tasks being queued
-         //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
+   protected void task(T command) {
+      if (!started) {
+         logger.debug("Ordered executor has been shutdown at", new Exception("debug"));
+      }
+      //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
+      tasks.add(command);
+      //cache locally the state to avoid multiple volatile loads
+      final int state = stateUpdater.get(this);
+      if (state == STATE_FORCED_SHUTDOWN) {
+         //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
+         tasks.clear();
+      } else if (state == STATE_NOT_RUNNING) {
+         //startPoller could be deleted but is maintained because is inherited
          delegate.execute(task);
       }
    }
@@ -146,4 +207,8 @@ public abstract class ProcessorBase<T> {
       return tasks.size();
    }
 
+   public final int status() {
+      return stateUpdater.get(this);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
index 9446f50..4e2bbba 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,7 +71,7 @@ public class OrderedExecutorSanityTest {
          //from now on new tasks won't be executed
          final CountDownLatch afterDeatchExecution = new CountDownLatch(1);
          executor.execute(afterDeatchExecution::countDown);
-         Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(1, TimeUnit.SECONDS));
+         Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(100, TimeUnit.MILLISECONDS));
          //to avoid memory leaks the executor must take care of the new submitted tasks immediatly
          Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining());
       } finally {
@@ -78,4 +79,70 @@ public class OrderedExecutorSanityTest {
       }
    }
 
+
+
+   @Test
+   public void shutdownWithin() throws InterruptedException {
+      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      try {
+         final OrderedExecutor executor = new OrderedExecutor(executorService);
+         final CountDownLatch latch = new CountDownLatch(1);
+         final AtomicInteger numberOfTasks = new AtomicInteger(0);
+         final CountDownLatch ran = new CountDownLatch(1);
+
+         executor.execute(() -> {
+            try {
+               latch.await(1, TimeUnit.MINUTES);
+               numberOfTasks.set(executor.shutdownNow().size());
+               ran.countDown();
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         });
+
+
+         for (int i = 0; i < 100; i++) {
+            executor.execute(() -> System.out.println("Dont worry, this will never happen"));
+         }
+
+         latch.countDown();
+         ran.await(1, TimeUnit.SECONDS);
+         Assert.assertEquals(100, numberOfTasks.get());
+
+         Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status());
+         Assert.assertEquals(0, executor.remaining());
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
+
+   @Test
+   public void testMeasure() throws InterruptedException {
+      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      try {
+         final OrderedExecutor executor = new OrderedExecutor(executorService);
+         int MAX_LOOP = 1_000_000;
+
+         // extend the number for longer numbers
+         int runs = 10;
+
+         for (int i = 0; i < runs; i++) {
+            long start = System.nanoTime();
+            final CountDownLatch executed = new CountDownLatch(MAX_LOOP);
+            for (int l = 0; l < MAX_LOOP; l++) {
+               executor.execute(executed::countDown);
+            }
+            Assert.assertTrue(executed.await(1, TimeUnit.MINUTES));
+            long end = System.nanoTime();
+
+            long elapsed = (end - start);
+
+            System.out.println("execution " + i + " in " + TimeUnit.NANOSECONDS.toMillis(elapsed) + " milliseconds");
+         }
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index f78f43f..e1e1b68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -159,11 +159,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    private final boolean direct;
 
-   //marker instance used to recognize if a thread is performing a packet handling
-   private static final Object DUMMY = Boolean.TRUE;
-
-   //a thread that has its thread-local map populated with DUMMY is performing a packet handling
-   private static final ThreadLocal<Object> inHandler = new ThreadLocal<>();
 
    public ServerSessionPacketHandler(final ActiveMQServer server,
                                      final CoreProtocolManager manager,
@@ -231,26 +226,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
    }
 
-   private static void onStartMessagePacketHandler() {
-      assert inHandler.get() == null : "recursion on packet handling is not supported";
-      inHandler.set(DUMMY);
-   }
-
-   private static boolean inHandler() {
-      final Object dummy = inHandler.get();
-      //sanity check: can't exist a thread using a marker different from DUMMY
-      assert ((dummy != null && dummy == DUMMY) || dummy == null) : "wrong marker";
-      return dummy != null;
-   }
-
-   private static void onExitMessagePacketHandler() {
-      assert inHandler.get() != null : "marker not set";
-      inHandler.set(null);
-   }
-
    public void closeExecutors() {
-      packetActor.shutdownNow();
-      callExecutor.shutdownNow();
+      packetActor.shutdown();
+      callExecutor.shutdown();
    }
 
    public void close() {
@@ -280,33 +258,28 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       if (logger.isTraceEnabled()) {
          logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
       }
-      onStartMessagePacketHandler();
-      try {
-         final byte type = packet.getType();
-         switch (type) {
-            case SESS_SEND: {
-               onSessionSend(packet);
-               break;
-            }
-            case SESS_ACKNOWLEDGE: {
-               onSessionAcknowledge(packet);
-               break;
-            }
-            case SESS_PRODUCER_REQUEST_CREDITS: {
-               onSessionRequestProducerCredits(packet);
-               break;
-            }
-            case SESS_FLOWTOKEN: {
-               onSessionConsumerFlowCredit(packet);
-               break;
-            }
-            default:
-               // separating a method for everything else as JIT was faster this way
-               slowPacketHandler(packet);
-               break;
+      final byte type = packet.getType();
+      switch (type) {
+         case SESS_SEND: {
+            onSessionSend(packet);
+            break;
          }
-      } finally {
-         onExitMessagePacketHandler();
+         case SESS_ACKNOWLEDGE: {
+            onSessionAcknowledge(packet);
+            break;
+         }
+         case SESS_PRODUCER_REQUEST_CREDITS: {
+            onSessionRequestProducerCredits(packet);
+            break;
+         }
+         case SESS_FLOWTOKEN: {
+            onSessionConsumerFlowCredit(packet);
+            break;
+         }
+         default:
+            // separating a method for everything else as JIT was faster this way
+            slowPacketHandler(packet);
+            break;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index a63eec7..2d6f003 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -302,7 +302,7 @@ public abstract class ActiveMQTestBase extends Assert {
          //clean up pools before failing
          if (!exceptions.isEmpty()) {
             for (Exception exception : exceptions) {
-               exception.printStackTrace();
+               exception.printStackTrace(System.out);
             }
             fail("Client Session Factories still trying to reconnect, see above to see where created");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 9c05114..ef53344 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -330,9 +330,9 @@ public class ConsumerTest extends ActiveMQTestBase {
          connection.close();
       }
 
-      assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
-      assertNull(server.locateQueue(SimpleString.toSimpleString("queue")));
-      assertEquals(0, server.getTotalMessageCount());
+      Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString("queue")) == null);
+      Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString("queue")) == null);
+      Wait.assertEquals(0, server::getTotalMessageCount);
    }
 
    @Test

Reply | Threaded
Open this post in threaded view
|

[3/7] activemq-artemis git commit: ARTEMIS-1495 Removing flushes from codebase

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1495 Removing flushes from codebase

Instead of flushing we just need to make sure there are no more calls into
page executors as we stop the PageManager.

This will avoid any possible starvations or deadlocks here.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e6176a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e6176a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e6176a6

Branch: refs/heads/master
Commit: 2e6176a69333367c165ed463437d6302f3f8da9e
Parents: 8bf879f
Author: Clebert Suconic <[hidden email]>
Authored: Tue Nov 7 14:52:19 2017 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ArtemisExecutor.java   | 23 +++----
 .../artemis/utils/actors/ProcessorBase.java     | 68 +++++++++-----------
 .../cursor/impl/PageCursorProviderImpl.java     |  2 +-
 .../core/paging/impl/PagingStoreImpl.java       |  2 +-
 .../core/ServerSessionPacketHandler.java        | 14 ++--
 .../protocol/core/impl/CoreSessionCallback.java |  2 +-
 .../management/impl/ManagementServiceImpl.java  |  4 +-
 .../jms/consumer/JmsConsumerTest.java           | 11 ++--
 8 files changed, 58 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index d3036ec..5e72ef2 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -25,7 +25,7 @@ public interface ArtemisExecutor extends Executor {
 
    /**
     * Artemis is supposed to implement this properly, however in tests or tools
-    * this can be used as a fake, doing a sipmle delegate and using the default methods implemented here.
+    * this can be used as a fake, doing a simple delegate and using the default methods implemented here.
     * @param executor
     * @return
     */
@@ -38,11 +38,16 @@ public interface ArtemisExecutor extends Executor {
       };
    }
 
-   default boolean flush() {
-      return flush(30, TimeUnit.SECONDS);
+   /** It will wait the current execution (if there is one) to finish
+    *  but will not complete any further executions */
+   default void shutdownNow() {
    }
 
-   default boolean flush(long timeout, TimeUnit unit) {
+   /**
+    * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
+    * @return
+    */
+   default boolean isFlushed() {
       CountDownLatch latch = new CountDownLatch(1);
       Runnable runnable = new Runnable() {
          @Override
@@ -52,18 +57,10 @@ public interface ArtemisExecutor extends Executor {
       };
       execute(runnable);
       try {
-         return latch.await(timeout, unit);
+         return latch.await(100, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
          return false;
       }
    }
 
-   /**
-    * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
-    * @return
-    */
-   default boolean isFlushed() {
-      return flush(100, TimeUnit.MILLISECONDS);
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index dbc0776..44b2916 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -20,7 +20,6 @@ package org.apache.activemq.artemis.utils.actors;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public abstract class ProcessorBase<T> {
@@ -34,6 +33,9 @@ public abstract class ProcessorBase<T> {
 
    private final ExecutorTask task = new ExecutorTask();
 
+   private final Object startedGuard = new Object();
+   private volatile boolean started = true;
+
    // used by stateUpdater
    @SuppressWarnings("unused")
    private volatile int state = 0;
@@ -49,8 +51,18 @@ public abstract class ProcessorBase<T> {
             if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
                T task = tasks.poll();
                //while the queue is not empty we process in order
-               while (task != null) {
-                  doTask(task);
+
+               // All we care on started, is that a current task is not running as we call shutdown.
+               // for that reason this first run doesn't need to be under any lock
+               while (task != null && started) {
+
+                  // Synchronized here is just to guarantee that a current task is finished before
+                  // the started update can be taken as false
+                  synchronized (startedGuard) {
+                     if (started) {
+                        doTask(task);
+                     }
+                  }
                   task = tasks.poll();
                }
                //set state back to not running.
@@ -66,52 +78,32 @@ public abstract class ProcessorBase<T> {
       }
    }
 
+   /** It will wait the current execution (if there is one) to finish
+    *  but will not complete any further executions */
+   public void shutdownNow() {
+      synchronized (startedGuard) {
+         started = false;
+      }
+      tasks.clear();
+   }
+
    protected abstract void doTask(T task);
 
    public ProcessorBase(Executor parent) {
       this.delegate = parent;
    }
 
-   public final boolean flush() {
-      return flush(30, TimeUnit.SECONDS);
-   }
-
-   /**
-    * WARNING: This will only flush when all the activity is suspended.
-    *          don't expect success on this call if another thread keeps feeding the queue
-    *          this is only valid on situations where you are not feeding the queue,
-    *          like in shutdown and failover situations.
-    * */
-   public final boolean flush(long timeout, TimeUnit unit) {
-      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
-         // quick test, most of the time it will be empty anyways
-         return true;
-      }
-
-      long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
-      try {
-         while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
-
-            if (tasks.isEmpty()) {
-               return true;
-            }
-
-            Thread.sleep(10);
-         }
-      } catch (InterruptedException e) {
-         // ignored
-      }
-
-      return stateUpdater.get(this) == STATE_NOT_RUNNING;
-   }
-
    public final boolean isFlushed() {
       return stateUpdater.get(this) == STATE_NOT_RUNNING;
    }
 
    protected void task(T command) {
-      tasks.add(command);
-      startPoller();
+      // There is no need to verify the lock here.
+      // you can only turn of running once
+      if (started) {
+         tasks.add(command);
+         startPoller();
+      }
    }
 
    protected void startPoller() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 2030d25..45b2c1d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -240,7 +240,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          cursor.stop();
       }
 
-      waitForFuture();
+      executor.shutdownNow();
    }
 
    private void waitForFuture() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 6f85aa2..f1beb31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -352,7 +352,7 @@ public class PagingStoreImpl implements PagingStore {
 
          running = false;
 
-         flushExecutors();
+         executor.shutdownNow();
 
          if (currentPage != null) {
             currentPage.close(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 75ef071..f78f43f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -220,7 +220,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
    public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
       ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
 
-      flushExecutor();
+      closeExecutors();
 
       try {
          session.close(true);
@@ -248,15 +248,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       inHandler.set(null);
    }
 
-   public void flushExecutor() {
-      if (!inHandler()) {
-         packetActor.flush();
-         callExecutor.flush();
-      }
+   public void closeExecutors() {
+      packetActor.shutdownNow();
+      callExecutor.shutdownNow();
    }
 
    public void close() {
-      flushExecutor();
+      closeExecutors();
 
       channel.flushConfirmations();
 
@@ -895,8 +893,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             remotingConnection.removeFailureListener((FailureListener) closeListener);
          }
       }
-
-      flushExecutor();
    }
 
    public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 8168e6e..92b3768 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -67,7 +67,7 @@ public final class CoreSessionCallback implements SessionCallback {
       ServerSessionPacketHandler localHandler = handler;
       if (localHandler != null) {
          // We wait any pending tasks before we make this as closed
-         localHandler.flushExecutor();
+         localHandler.closeExecutors();
       }
       this.handler = null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index e94e40b..81a8e84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -257,7 +257,9 @@ public class ManagementServiceImpl implements ManagementService {
       ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
       unregisterFromJMX(objectName);
       unregisterFromRegistry(ResourceNames.QUEUE + name);
-      messageCounterManager.unregisterMessageCounter(name.toString());
+      if (messageCounterManager != null) {
+         messageCounterManager.unregisterMessageCounter(name.toString());
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
index 5cefbd0..93229e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.junit.Assert;
 import org.junit.Before;
@@ -303,8 +304,9 @@ public class JmsConsumerTest extends JMSTestBase {
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
       conn.close();
 
-      Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
-      Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+      Queue queue = server.locateQueue(queueName);
+      Wait.assertEquals(0, queue::getDeliveringCount);
+      Wait.assertEquals(0, queue::getMessageCount);
    }
 
    @Test
@@ -329,8 +331,9 @@ public class JmsConsumerTest extends JMSTestBase {
 
       // Messages should all have been acked since we set pre ack on the cf
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
-      Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
-      Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+      Queue queue = server.locateQueue(queueName);
+      Wait.assertEquals(0, queue::getDeliveringCount);
+      Wait.assertEquals(0, queue::getMessageCount);
    }
 
    @Test

Reply | Threaded
Open this post in threaded view
|

[4/7] activemq-artemis git commit: ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8bf879f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8bf879f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8bf879f1

Branch: refs/heads/master
Commit: 8bf879f1560b958907bf8f6808bc66b8f2402431
Parents: c2a21c9
Author: Francesco Nigro <[hidden email]>
Authored: Thu Nov 2 10:51:43 2017 +0100
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../tests/integration/client/ConsumerTest.java  | 120 +++++++++++++++++++
 1 file changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8bf879f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index af172c8..9c05114 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -31,9 +32,13 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.stream.Stream;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -1074,4 +1079,119 @@ public class ConsumerTest extends ActiveMQTestBase {
       session.close();
    }
 
+   @Test
+   public void testMultipleConsumersOnSharedQueue() throws Throwable {
+      if (!isNetty() || this.durable) {
+         return;
+      }
+      final boolean durable = false;
+      final long TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
+      final int forks = 100;
+      final int queues = forks;
+      final int runs = 1;
+      final int messages = 1;
+      final ConnectionFactory factorySend = createFactory(1);
+      final AtomicLongArray receivedMessages = new AtomicLongArray(forks);
+      final Thread[] producersRunners = new Thread[forks];
+      final Thread[] consumersRunners = new Thread[forks];
+      //parties are forks (1 producer 1 consumer) + 1 controller in the main test thread
+      final CyclicBarrier onStartRun = new CyclicBarrier((forks * 2) + 1);
+      final CyclicBarrier onFinishRun = new CyclicBarrier((forks * 2) + 1);
+
+      final int messagesSent = forks * messages;
+      final AtomicInteger messagesRecieved = new AtomicInteger(0);
+
+      for (int i = 0; i < forks; i++) {
+         final int forkIndex = i;
+         final String queueName = "q_" + (forkIndex % queues);
+         final Thread producerRunner = new Thread(() -> {
+            try (Connection connection = factorySend.createConnection()) {
+               connection.start();
+               try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+                  final javax.jms.Queue queue = session.createQueue(queueName);
+                  try (MessageProducer producer = session.createProducer(queue)) {
+                     producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+                     for (int r = 0; r < runs; r++) {
+                        onStartRun.await();
+                        for (int m = 0; m < messages; m++) {
+                           final BytesMessage bytesMessage = session.createBytesMessage();
+                           bytesMessage.writeInt(forkIndex);
+                           producer.send(bytesMessage);
+                        }
+                        onFinishRun.await();
+                     }
+                  } catch (InterruptedException | BrokenBarrierException e) {
+                     e.printStackTrace();
+                  }
+               }
+            } catch (JMSException e) {
+               e.printStackTrace();
+            }
+         });
+
+         producerRunner.setDaemon(true);
+
+         final Thread consumerRunner = new Thread(() -> {
+            try (Connection connection = factorySend.createConnection()) {
+               connection.start();
+               try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+                  final javax.jms.Queue queue = session.createQueue(queueName);
+                  try (MessageConsumer consumer = session.createConsumer(queue)) {
+                     for (int r = 0; r < runs; r++) {
+                        onStartRun.await();
+                        while (messagesRecieved.get() != messagesSent) {
+                           final BytesMessage receivedMessage = (BytesMessage) consumer.receive(1000);
+                           if (receivedMessage != null) {
+                              final int receivedConsumerIndex = receivedMessage.readInt();
+                              receivedMessages.getAndIncrement(receivedConsumerIndex);
+                              messagesRecieved.incrementAndGet();
+                           }
+                        }
+                        onFinishRun.await();
+                     }
+                  } catch (InterruptedException e) {
+                     e.printStackTrace();
+                  } catch (BrokenBarrierException e) {
+                     e.printStackTrace();
+                  }
+               }
+            } catch (JMSException e) {
+               e.printStackTrace();
+            }
+         });
+         consumerRunner.setDaemon(true);
+         consumersRunners[forkIndex] = consumerRunner;
+         producersRunners[forkIndex] = producerRunner;
+      }
+      Stream.of(consumersRunners).forEach(Thread::start);
+      Stream.of(producersRunners).forEach(Thread::start);
+      final long messagesPerRun = (forks * messages);
+      for (int r = 0; r < runs; r++) {
+         onStartRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+         System.out.println("started run " + r);
+         final long start = System.currentTimeMillis();
+         onFinishRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+         final long elapsedMillis = System.currentTimeMillis() - start;
+         System.out.println((messagesPerRun * 1000L) / elapsedMillis + " msg/sec");
+      }
+      Stream.of(producersRunners).forEach(runner -> {
+         try {
+            runner.join(TIMEOUT_MILLIS * runs);
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+      });
+      Stream.of(producersRunners).forEach(Thread::interrupt);
+      Stream.of(consumersRunners).forEach(runner -> {
+         try {
+            runner.join(TIMEOUT_MILLIS * runs);
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+      });
+      Stream.of(consumersRunners).forEach(Thread::interrupt);
+      for (int i = 0; i < forks; i++) {
+         Assert.assertEquals("The consumer " + i + " must receive all the messages sent.", messages * runs, receivedMessages.get(i));
+      }
+   }
 }

Reply | Threaded
Open this post in threaded view
|

[5/7] activemq-artemis git commit: ARTEMIS-1495 Sanity tests for the ProcessorBase::shutdownNow feature

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1495 Sanity tests for the ProcessorBase::shutdownNow feature


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c5b57f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c5b57f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c5b57f1

Branch: refs/heads/master
Commit: 3c5b57f1e9ed2f3b71df9b38e9bcbd3be7e31d0a
Parents: 2e6176a
Author: Francesco Nigro <[hidden email]>
Authored: Wed Nov 8 10:05:35 2017 +0100
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ProcessorBase.java     | 10 +++
 .../utils/actors/OrderedExecutorSanityTest.java | 81 ++++++++++++++++++++
 2 files changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c5b57f1/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 44b2916..8d19c22 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -114,4 +114,14 @@ public abstract class ProcessorBase<T> {
       }
    }
 
+   /**
+    * Returns the remaining items to be processed.
+    * <p>
+    * This method is safe to be called by different threads and its accuracy is subject to concurrent modifications.<br>
+    * It is meant to be used only for test purposes, because of its {@code O(n)} cost.
+    */
+   public final int remaining() {
+      return tasks.size();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c5b57f1/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
new file mode 100644
index 0000000..9446f50
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OrderedExecutorSanityTest {
+
+   @Test
+   public void shouldExecuteTasksInOrder() throws InterruptedException {
+      final int threads = 3;
+      final int tasks = 100;
+      final long timeoutMillis = TimeUnit.SECONDS.toMillis(10);
+      final ExecutorService executorService = Executors.newFixedThreadPool(threads);
+      try {
+         final ArtemisExecutor executor = new OrderedExecutor(executorService);
+         //it can be not thread safe too
+         final List<Integer> results = new ArrayList<>(tasks);
+         final List<Integer> expectedResults = new ArrayList<>(tasks);
+         final CountDownLatch executed = new CountDownLatch(tasks);
+         for (int i = 0; i < tasks; i++) {
+            final int value = i;
+            executor.execute(() -> {
+               results.add(value);
+               executed.countDown();
+            });
+            expectedResults.add(value);
+         }
+         Assert.assertTrue("The tasks must be executed in " + timeoutMillis + " ms", executed.await(timeoutMillis, TimeUnit.MILLISECONDS));
+         Assert.assertArrayEquals("The processing of tasks must be ordered", expectedResults.toArray(), results.toArray());
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
+   @Test
+   public void shouldShutdownNowDoNotExecuteFurtherTasks() throws InterruptedException {
+      final long timeoutMillis = TimeUnit.SECONDS.toMillis(10);
+      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      try {
+         final OrderedExecutor executor = new OrderedExecutor(executorService);
+         final CountDownLatch executed = new CountDownLatch(1);
+         executor.execute(executed::countDown);
+         Assert.assertTrue("The task must be executed in " + timeoutMillis + " ms", executed.await(timeoutMillis, TimeUnit.MILLISECONDS));
+         executor.shutdownNow();
+         Assert.assertEquals("There are no remaining tasks to be executed", 0, executor.remaining());
+         //from now on new tasks won't be executed
+         final CountDownLatch afterDeatchExecution = new CountDownLatch(1);
+         executor.execute(afterDeatchExecution::countDown);
+         Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(1, TimeUnit.SECONDS));
+         //to avoid memory leaks the executor must take care of the new submitted tasks immediatly
+         Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining());
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
+}

Reply | Threaded
Open this post in threaded view
|

[6/7] activemq-artemis git commit: ARTEMIS-1495 Few perf improvements to: - reduce volatile loads - allow method inlining for hot execution paths - reduced pointers chasing due to inner classes uses

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1495 Few perf improvements to:
 - reduce volatile loads
 - allow method inlining for hot execution paths
 - reduced pointers chasing due to inner classes uses


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/33b3eb6f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/33b3eb6f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/33b3eb6f

Branch: refs/heads/master
Commit: 33b3eb6f095da4a21648c268c7a960e55f414ca3
Parents: 91db080
Author: Francesco Nigro <[hidden email]>
Authored: Thu Nov 9 11:26:21 2017 +0100
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ArtemisExecutor.java   |  25 +++-
 .../artemis/utils/actors/ProcessorBase.java     | 135 +++++++++++--------
 .../utils/actors/OrderedExecutorSanityTest.java |   4 +-
 3 files changed, 99 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 8efb3d3..9903d65 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -17,11 +17,10 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 public interface ArtemisExecutor extends Executor {
 
@@ -40,10 +39,24 @@ public interface ArtemisExecutor extends Executor {
       };
    }
 
-   /** It will wait the current execution (if there is one) to finish
-    *  but will not complete any further executions */
-   default List<Runnable> shutdownNow() {
-      return Collections.emptyList();
+   /**
+    * It will wait the current execution (if there is one) to finish
+    * but will not complete any further executions.
+    *
+    * @param onPendingTask it will be called for each pending task found
+    * @return the number of pending tasks that won't be executed
+    */
+   default int shutdownNow(Consumer<? super Runnable> onPendingTask) {
+      return 0;
+   }
+
+   /**
+    * It will wait the current execution (if there is one) to finish
+    * but will not complete any further executions
+    */
+   default int shutdownNow() {
+      return shutdownNow(t -> {
+      });
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 1c77a52..ff6d9a1 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -17,21 +17,19 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
 
 import org.jboss.logging.Logger;
 
 public abstract class ProcessorBase<T> extends HandlerBase {
 
    private static final Logger logger = Logger.getLogger(ProcessorBase.class);
-
    public static final int STATE_NOT_RUNNING = 0;
    public static final int STATE_RUNNING = 1;
    public static final int STATE_FORCED_SHUTDOWN = 2;
@@ -39,53 +37,50 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
 
    private final Executor delegate;
-
-   private final ExecutorTask task = new ExecutorTask();
+   /**
+    * Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing
+    * when accessing ProcessorBase.this fields/methods.
+    */
+   private final Runnable task = this::executePendingTasks;
 
    // used by stateUpdater
    @SuppressWarnings("unused")
    private volatile int state = STATE_NOT_RUNNING;
-
+   // Request of forced shutdown
+   private volatile boolean requestedForcedShutdown = false;
+   // Request of educated shutdown:
    private volatile boolean requestedShutdown = false;
 
-   private volatile boolean started = true;
-
    private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
-   private final class ExecutorTask implements Runnable {
-
-      @Override
-      public void run() {
-         do {
-            //if there is no thread active and is not already dead then we run
-            if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
-               enter();
-               try {
-                  T task = tasks.poll();
-                  //while the queue is not empty we process in order
-                  while (task != null && !requestedShutdown) {
-                     //just drain the tasks if has been requested a shutdown to help the shutdown process
-                     if (requestedShutdown) {
-                        tasks.add(task);
-                        break;
-                     }
-                     doTask(task);
-                     task = tasks.poll();
-                  }
-               } finally {
-                  leave();
-                  //set state back to not running.
-                  stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING);
+   private void executePendingTasks() {
+      do {
+         //if there is no thread active and is not already dead then we run
+         if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
+            enter();
+            try {
+               T task;
+               //while the queue is not empty we process in order:
+               //if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
+               while (!requestedForcedShutdown && (task = tasks.poll()) != null) {
+                  doTask(task);
+               }
+            } finally {
+               leave();
+               //set state back to not running if possible: shutdownNow could be called by doTask(task).
+               //If a shutdown has happened there is no need to continue polling tasks
+               if (!stateUpdater.compareAndSet(this, STATE_RUNNING, STATE_NOT_RUNNING)) {
+                  return;
                }
-            } else {
-               return;
             }
-            //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
-            //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
-            //this check fixes the issue
+         } else {
+            return;
          }
-         while (!tasks.isEmpty());
+         //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
+         //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
+         //this check fixes the issue
       }
+      while (!tasks.isEmpty() && !requestedShutdown);
    }
 
    /**
@@ -96,7 +91,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    public void shutdown(long timeout, TimeUnit unit) {
-      started = false;
+      requestedShutdown = true;
 
       if (!inHandler()) {
          // if it's in handler.. we just return
@@ -108,10 +103,10 @@ public abstract class ProcessorBase<T> extends HandlerBase {
     * It will wait the current execution (if there is one) to finish
     * but will not complete any further executions
     */
-   public List<T> shutdownNow() {
+   public int shutdownNow(Consumer<? super T> onPendingItem) {
       //alert anyone that has been requested (at least) an immediate shutdown
+      requestedForcedShutdown = true;
       requestedShutdown = true;
-      started = false;
 
       if (inHandler()) {
          stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
@@ -121,7 +116,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
             //alert the ExecutorTask (if is running) to just drain the current backlog of tasks
             final int startState = stateUpdater.get(this);
             if (startState == STATE_FORCED_SHUTDOWN) {
-               //another thread has completed a forced shutdown
+               //another thread has completed a forced shutdown: let it to manage the tasks cleanup
                break;
             }
             if (startState == STATE_RUNNING) {
@@ -135,10 +130,16 @@ public abstract class ProcessorBase<T> extends HandlerBase {
          //can be set by just one caller.
          //As noted on the execute method there is a small chance that some tasks would be enqueued
       }
-      ArrayList<T> returnList = new ArrayList<>(tasks);
-      tasks.clear();
-
-      return returnList;
+      int pendingItems = 0;
+      //there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them
+      synchronized (tasks) {
+         T item;
+         while ((item = tasks.poll()) != null) {
+            onPendingItem.accept(item);
+            pendingItems++;
+         }
+      }
+      return pendingItems;
    }
 
    protected abstract void doTask(T task);
@@ -148,7 +149,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    public final boolean isFlushed() {
-      return stateUpdater.get(this) == STATE_NOT_RUNNING;
+      return this.state == STATE_NOT_RUNNING;
    }
 
    /**
@@ -158,14 +159,14 @@ public abstract class ProcessorBase<T> extends HandlerBase {
     * like in shutdown and failover situations.
     */
    public final boolean flush(long timeout, TimeUnit unit) {
-      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+      if (this.state == STATE_NOT_RUNNING) {
          // quick test, most of the time it will be empty anyways
          return true;
       }
 
       long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
       try {
-         while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
+         while (this.state == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
 
             if (tasks.isEmpty()) {
                return true;
@@ -177,23 +178,42 @@ public abstract class ProcessorBase<T> extends HandlerBase {
          // ignored
       }
 
-      return stateUpdater.get(this) == STATE_NOT_RUNNING;
+      return this.state == STATE_NOT_RUNNING;
    }
 
    protected void task(T command) {
-      if (!started) {
-         logger.debug("Ordered executor has been shutdown at", new Exception("debug"));
+      if (requestedShutdown) {
+         logAddOnShutdown();
       }
       //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
       tasks.add(command);
       //cache locally the state to avoid multiple volatile loads
       final int state = stateUpdater.get(this);
-      if (state == STATE_FORCED_SHUTDOWN) {
-         //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
-         tasks.clear();
-      } else if (state == STATE_NOT_RUNNING) {
+      if (state != STATE_RUNNING) {
+         onAddedTaskIfNotRunning(state);
+      }
+   }
+
+   /**
+    * This has to be called on the assumption that state!=STATE_RUNNING.
+    * It is packed separately from {@link #task(Object)} just for performance reasons: it
+    * handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
+    */
+   private void onAddedTaskIfNotRunning(int state) {
+      if (state == STATE_NOT_RUNNING) {
          //startPoller could be deleted but is maintained because is inherited
          delegate.execute(task);
+      } else if (state == STATE_FORCED_SHUTDOWN) {
+         //help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add
+         synchronized (tasks) {
+            tasks.clear();
+         }
+      }
+   }
+
+   private static void logAddOnShutdown() {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Ordered executor has been gently shutdown at", new Exception("debug"));
       }
    }
 
@@ -208,7 +228,8 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    public final int status() {
-      return stateUpdater.get(this);
+      //avoid using the updater because in older version of JDK 8 isn't optimized as a vanilla volatile get
+      return this.state;
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
index 4e2bbba..345cbb5 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -82,7 +82,7 @@ public class OrderedExecutorSanityTest {
 
 
    @Test
-   public void shutdownWithin() throws InterruptedException {
+   public void shutdownNowOnDelegateExecutor() throws InterruptedException {
       final ExecutorService executorService = Executors.newSingleThreadExecutor();
       try {
          final OrderedExecutor executor = new OrderedExecutor(executorService);
@@ -93,7 +93,7 @@ public class OrderedExecutorSanityTest {
          executor.execute(() -> {
             try {
                latch.await(1, TimeUnit.MINUTES);
-               numberOfTasks.set(executor.shutdownNow().size());
+               numberOfTasks.set(executor.shutdownNow());
                ran.countDown();
             } catch (Exception e) {
                e.printStackTrace();

Reply | Threaded
Open this post in threaded view
|

[7/7] activemq-artemis git commit: ARTEMIS-1495 Lock-free ProcessorBase::shutdownNow

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1495 Lock-free ProcessorBase::shutdownNow


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0fadc68c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0fadc68c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0fadc68c

Branch: refs/heads/master
Commit: 0fadc68ca503eb35d75ac95292cd85339dc8b017
Parents: 3c5b57f
Author: Francesco Nigro <[hidden email]>
Authored: Wed Nov 8 12:03:49 2017 +0100
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ProcessorBase.java     | 72 +++++++++++++-------
 1 file changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0fadc68c/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 8d19c22..73dbf2f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -21,11 +21,13 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
 
 public abstract class ProcessorBase<T> {
 
    private static final int STATE_NOT_RUNNING = 0;
    private static final int STATE_RUNNING = 1;
+   private static final int STATE_FORCED_SHUTDOWN = 2;
 
    protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
 
@@ -33,12 +35,11 @@ public abstract class ProcessorBase<T> {
 
    private final ExecutorTask task = new ExecutorTask();
 
-   private final Object startedGuard = new Object();
-   private volatile boolean started = true;
-
    // used by stateUpdater
    @SuppressWarnings("unused")
-   private volatile int state = 0;
+   private volatile int state = STATE_NOT_RUNNING;
+
+   private volatile boolean requestedShutdown = false;
 
    private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
@@ -47,26 +48,22 @@ public abstract class ProcessorBase<T> {
       @Override
       public void run() {
          do {
-            //if there is no thread active then we run
+            //if there is no thread active and is not already dead then we run
             if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
-               T task = tasks.poll();
-               //while the queue is not empty we process in order
-
-               // All we care on started, is that a current task is not running as we call shutdown.
-               // for that reason this first run doesn't need to be under any lock
-               while (task != null && started) {
-
-                  // Synchronized here is just to guarantee that a current task is finished before
-                  // the started update can be taken as false
-                  synchronized (startedGuard) {
-                     if (started) {
+               try {
+                  T task = tasks.poll();
+                  //while the queue is not empty we process in order
+                  while (task != null) {
+                     //just drain the tasks if has been requested a shutdown to help the shutdown process
+                     if (!requestedShutdown) {
                         doTask(task);
                      }
+                     task = tasks.poll();
                   }
-                  task = tasks.poll();
+               } finally {
+                  //set state back to not running.
+                  stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
                }
-               //set state back to not running.
-               stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
             } else {
                return;
             }
@@ -81,10 +78,28 @@ public abstract class ProcessorBase<T> {
    /** It will wait the current execution (if there is one) to finish
     *  but will not complete any further executions */
    public void shutdownNow() {
-      synchronized (startedGuard) {
-         started = false;
+      //alert anyone that has been requested (at least) an immediate shutdown
+      requestedShutdown = true;
+      //it could take a very long time depending on the current executing task
+      do {
+         //alert the ExecutorTask (if is running) to just drain the current backlog of tasks
+         final int startState = stateUpdater.get(this);
+         if (startState == STATE_FORCED_SHUTDOWN) {
+            //another thread has completed a forced shutdown
+            return;
+         }
+         if (startState == STATE_RUNNING) {
+            //wait 100 ms to avoid burning CPU while waiting and
+            //give other threads a chance to make progress
+            LockSupport.parkNanos(100_000_000L);
+         }
       }
+      while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
+      //this could happen just one time: the forced shutdown state is the last one and
+      //can be set by just one caller.
+      //As noted on the execute method there is a small chance that some tasks would be enqueued
       tasks.clear();
+      //we can report the killed tasks somehow: ExecutorService do the same on shutdownNow
    }
 
    protected abstract void doTask(T task);
@@ -98,11 +113,18 @@ public abstract class ProcessorBase<T> {
    }
 
    protected void task(T command) {
-      // There is no need to verify the lock here.
-      // you can only turn of running once
-      if (started) {
+      if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) {
+         //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
          tasks.add(command);
-         startPoller();
+         //cache locally the state to avoid multiple volatile loads
+         final int state = stateUpdater.get(this);
+         if (state == STATE_FORCED_SHUTDOWN) {
+            //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
+            tasks.clear();
+         } else if (state == STATE_NOT_RUNNING) {
+            //startPoller could be deleted but is maintained because is inherited
+            delegate.execute(task);
+         }
       }
    }