activemq-artemis git commit: ARTEMIS-1333 fixing test, cannot flush itself from Runnable

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

activemq-artemis git commit: ARTEMIS-1333 fixing test, cannot flush itself from Runnable

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 4762e52ef -> 012fe58b2


ARTEMIS-1333 fixing test, cannot flush itself from Runnable


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/012fe58b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/012fe58b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/012fe58b

Branch: refs/heads/master
Commit: 012fe58b2c9a953a7714fb0bab938a59c6e74f5d
Parents: 4762e52
Author: Clebert Suconic <[hidden email]>
Authored: Wed Aug 9 15:57:55 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Wed Aug 9 16:02:10 2017 -0400

----------------------------------------------------------------------
 .../artemis/utils/actors/ProcessorBase.java     |  5 ++
 .../core/ServerSessionPacketHandler.java        | 56 ++++++++++++--------
 .../protocol/core/impl/CoreSessionCallback.java |  2 +-
 .../ActiveMQServerControlUsingCoreTest.java     |  9 ++++
 4 files changed, 48 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/012fe58b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index fcd197d..dbc0776 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -91,6 +91,11 @@ public abstract class ProcessorBase<T> {
       long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
       try {
          while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
+
+            if (tasks.isEmpty()) {
+               return true;
+            }
+
             Thread.sleep(10);
          }
       } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/012fe58b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 87ac615..982bc88 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -158,6 +159,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    private final boolean direct;
 
+   private static final ThreadLocal<AtomicBoolean> inHandler = ThreadLocal.withInitial(AtomicBoolean::new);
+
    public ServerSessionPacketHandler(final ActiveMQServer server,
                                      final CoreProtocolManager manager,
                                      final ServerSession session,
@@ -225,8 +228,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
    }
 
    public void flushExecutor() {
-      packetActor.flush();
-      callExecutor.flush();
+      if (!inHandler.get().get()) {
+         packetActor.flush();
+         callExecutor.flush();
+      }
    }
 
    public void close() {
@@ -256,28 +261,33 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       if (logger.isTraceEnabled()) {
          logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
       }
-      final byte type = packet.getType();
-      switch (type) {
-         case SESS_SEND: {
-            onSessionSend(packet);
-            break;
-         }
-         case SESS_ACKNOWLEDGE: {
-            onSessionAcknowledge(packet);
-            break;
-         }
-         case SESS_PRODUCER_REQUEST_CREDITS: {
-            onSessionRequestProducerCredits(packet);
-            break;
-         }
-         case SESS_FLOWTOKEN: {
-            onSessionConsumerFlowCredit(packet);
-            break;
+      inHandler.get().set(true);
+      try {
+         final byte type = packet.getType();
+         switch (type) {
+            case SESS_SEND: {
+               onSessionSend(packet);
+               break;
+            }
+            case SESS_ACKNOWLEDGE: {
+               onSessionAcknowledge(packet);
+               break;
+            }
+            case SESS_PRODUCER_REQUEST_CREDITS: {
+               onSessionRequestProducerCredits(packet);
+               break;
+            }
+            case SESS_FLOWTOKEN: {
+               onSessionConsumerFlowCredit(packet);
+               break;
+            }
+            default:
+               // separating a method for everything else as JIT was faster this way
+               slowPacketHandler(packet);
+               break;
          }
-         default:
-            // separating a method for everything else as JIT was faster this way
-            slowPacketHandler(packet);
-            break;
+      } finally {
+         inHandler.get().set(false);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/012fe58b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 866130b..8168e6e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -65,7 +65,7 @@ public final class CoreSessionCallback implements SessionCallback {
    @Override
    public void close(boolean failed) {
       ServerSessionPacketHandler localHandler = handler;
-      if (failed && localHandler != null) {
+      if (localHandler != null) {
          // We wait any pending tasks before we make this as closed
          localHandler.flushExecutor();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/012fe58b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index cbe3ce5..ed704ef 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -47,6 +47,15 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
    public void testScaleDownWithConnector() throws Exception {
    }
 
+
+   // it doesn't make sense through the core
+   // the pool will be shutdown while a connection is being used
+   // makes no sense!
+   @Override
+   public void testForceFailover() throws Exception {
+   }
+
+
    @Override
    protected ActiveMQServerControl createManagementControl() throws Exception {
       return new ActiveMQServerControl() {

Loading...