[activemq-artemis] branch master updated (7b34b56 -> e5e5744)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated (7b34b56 -> e5e5744)

nigrofranz
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 7b34b56  This closes #2495
     new a40a459  ARTEMIS-2205 Netty is used in a more idiomatic way
     new d79762f  ARTEMIS-2205 Refactor AMQP Processing into Netty Thread
     new 8281e3b  ARTEMIS-2205 Optimizing some Lambda usages
     new e5e5744  This closes #2467

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../artemis/cli/commands/etc/artemis.profile       |   3 +
 .../artemis/api/core/TransportConfiguration.java   |  11 +
 .../core/remoting/impl/netty/NettyConnection.java  | 125 +++------
 .../org/apache/activemq/artemis/junit/Wait.java    |   7 +-
 .../amqp/broker/AMQPConnectionCallback.java        |  14 +-
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 165 ++++--------
 .../broker/ActiveMQProtonRemotingConnection.java   |   3 +-
 .../amqp/broker/ProtonProtocolManager.java         |  10 +
 .../amqp/proton/AMQPConnectionContext.java         | 228 ++++++++--------
 .../protocol/amqp/proton/AMQPSessionContext.java   |  40 ++-
 .../amqp/proton/ProtonServerReceiverContext.java   | 136 +++++-----
 .../amqp/proton/ProtonServerSenderContext.java     | 196 +++++++++-----
 .../amqp/proton/handler/ExecutorNettyAdapter.java  | 221 ++++++++++++++++
 .../amqp/proton/handler/ProtonHandler.java         | 287 +++++++++++----------
 .../transaction/ProtonTransactionHandler.java      |  14 +-
 .../proton/transaction/ProtonTransactionImpl.java  |  25 +-
 .../amqp/broker/AMQPSessionCallbackTest.java       |  83 +++---
 .../core/paging/cursor/PagedReferenceImpl.java     |  26 +-
 .../activemq/artemis/core/postoffice/Binding.java  |   4 +
 .../activemq/artemis/core/postoffice/Bindings.java |   3 +
 .../artemis/core/postoffice/impl/BindingsImpl.java | 145 +++++++----
 .../core/postoffice/impl/LocalQueueBinding.java    |   5 +
 .../core/postoffice/impl/PostOfficeImpl.java       | 161 ++++++------
 .../remoting/server/impl/RemotingServiceImpl.java  |   2 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   2 +-
 .../activemq/artemis/core/server/Consumer.java     |   8 +
 .../artemis/core/server/MessageReference.java      |  11 +
 .../apache/activemq/artemis/core/server/Queue.java |   8 +
 .../artemis/core/server/RoutingContext.java        |  29 +++
 .../artemis/core/server/ServerConsumer.java        |   6 +-
 .../artemis/core/server/ServerSession.java         |  17 ++
 .../artemis/core/server/impl/LastValueQueue.java   |  12 +
 .../core/server/impl/MessageReferenceImpl.java     |  26 +-
 .../artemis/core/server/impl/QueueImpl.java        |  54 ++--
 .../core/server/impl/RoutingContextImpl.java       |  82 +++++-
 .../core/server/impl/ServerConsumerImpl.java       |  22 +-
 .../core/server/impl/ServerSessionImpl.java        |  41 ++-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   5 +
 .../integration/addressing/AddressingTest.java     |   2 +-
 .../integration/amqp/AmqpExpiredMessageTest.java   |   2 +-
 .../integration/amqp/AmqpFlowControlFailTest.java  |   4 +-
 .../integration/amqp/AmqpSendReceiveTest.java      |  49 ++++
 .../integration/amqp/AmqpTransactionTest.java      |   2 +-
 .../integration/amqp/JMSNonDestructiveTest.java    |   8 +-
 .../tests/integration/cli/DummyServerConsumer.java |   5 +
 .../tests/integration/client/ConsumerTest.java     |  10 +-
 .../tests/integration/client/HangConsumerTest.java |   5 +
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 +
 .../impl/WildcardAddressManagerUnitTest.java       |   5 +
 49 files changed, 1529 insertions(+), 805 deletions(-)
 create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 01/04: ARTEMIS-2205 Netty is used in a more idiomatic way

nigrofranz
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit a40a459f8c536a10a0dccae6e522ec38f09dd544
Author: Francesco Nigro <[hidden email]>
AuthorDate: Sat Mar 3 18:58:21 2018 +0100

    ARTEMIS-2205 Netty is used in a more idiomatic way
   
    This helped decreasing a lot of pressure on GC by not creating
    as many runnables for each write.
   
    Besides this helps fixing some of the issues I would have had on refactoring AMQP
    over flushing writes and other asynchronous issues.
---
 .../core/remoting/impl/netty/NettyConnection.java  | 125 ++++++---------------
 1 file changed, 35 insertions(+), 90 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index f8195fb..1032a35 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
 import io.netty.buffer.ByteBuf;
@@ -60,19 +59,12 @@ public class NettyConnection implements Connection {
     * here for when the connection (or Netty Channel) becomes available again.
     */
    private final List<ReadyListener> readyListeners = new ArrayList<>();
-   private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = ThreadLocal.withInitial(ArrayList::new);
+   private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = new ThreadLocal<>();
 
    private final boolean batchingEnabled;
    private final int writeBufferHighWaterMark;
    private final int batchLimit;
 
-   /**
-    * This counter is splitted in 2 variables to write it with less performance
-    * impact: no volatile get is required to update its value
-    */
-   private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong();
-   private long pendingWritesOnEventLoop = 0;
-
    private boolean closed;
    private RemotingConnection protocolConnection;
 
@@ -129,18 +121,6 @@ public class NettyConnection implements Connection {
       return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    }
 
-   public final long pendingWritesOnEventLoop() {
-      final EventLoop eventLoop = channel.eventLoop();
-      final boolean inEventLoop = eventLoop.inEventLoop();
-      final long pendingWritesOnEventLoop;
-      if (inEventLoop) {
-         pendingWritesOnEventLoop = this.pendingWritesOnEventLoop;
-      } else {
-         pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get();
-      }
-      return pendingWritesOnEventLoop;
-   }
-
    public final Channel getNettyChannel() {
       return channel;
    }
@@ -163,19 +143,27 @@ public class NettyConnection implements Connection {
 
    @Override
    public final void fireReady(final boolean ready) {
-      final ArrayList<ReadyListener> readyToCall = localListenersPool.get();
+      ArrayList<ReadyListener> readyToCall = localListenersPool.get();
+      if (readyToCall != null) {
+         localListenersPool.set(null);
+      }
       synchronized (readyListeners) {
          this.ready = ready;
 
          if (ready) {
             final int size = this.readyListeners.size();
-            readyToCall.ensureCapacity(size);
+            if (readyToCall != null) {
+               readyToCall.ensureCapacity(size);
+            }
             try {
                for (int i = 0; i < size; i++) {
                   final ReadyListener readyListener = readyListeners.get(i);
                   if (readyListener == null) {
                      break;
                   }
+                  if (readyToCall == null) {
+                     readyToCall = new ArrayList<>(size);
+                  }
                   readyToCall.add(readyListener);
                }
             } finally {
@@ -183,18 +171,23 @@ public class NettyConnection implements Connection {
             }
          }
       }
-      try {
-         final int size = readyToCall.size();
-         for (int i = 0; i < size; i++) {
-            try {
-               final ReadyListener readyListener = readyToCall.get(i);
-               readyListener.readyForWriting();
-            } catch (Throwable logOnly) {
-               ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
+      if (readyToCall != null) {
+         try {
+            readyToCall.forEach(readyListener -> {
+               try {
+                  readyListener.readyForWriting();
+               } catch (Throwable logOnly) {
+                  ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
+               }
+            });
+         } catch (Throwable t) {
+            ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
+         } finally {
+            readyToCall.clear();
+            if (localListenersPool.get() != null) {
+               localListenersPool.set(readyToCall);
             }
          }
-      } finally {
-         readyToCall.clear();
       }
    }
 
@@ -256,7 +249,7 @@ public class NettyConnection implements Connection {
       } catch (OutOfMemoryError oom) {
          final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
          // I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
-         logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
+         logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
          throw oom;
       }
    }
@@ -342,10 +335,7 @@ public class NettyConnection implements Connection {
    private boolean canWrite(final int requiredCapacity) {
       //evaluate if the write request could be taken:
       //there is enough space in the write buffer?
-      //The pending writes on event loop will eventually go into the Netty write buffer, hence consider them
-      //as part of the heuristic!
-      final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop();
-      final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel();
+      final long totalPendingWrites = this.pendingWritesOnChannel();
       final boolean canWrite;
       if (requiredCapacity > this.writeBufferHighWaterMark) {
          canWrite = totalPendingWrites == 0;
@@ -369,34 +359,6 @@ public class NettyConnection implements Connection {
       }
       //no need to lock because the Netty's channel is thread-safe
       //and the order of write is ensured by the order of the write calls
-      final EventLoop eventLoop = channel.eventLoop();
-      final boolean inEventLoop = eventLoop.inEventLoop();
-      if (!inEventLoop) {
-         writeNotInEventLoop(buffer, flush, batched, futureListener);
-      } else {
-         // OLD COMMENT:
-         // create a task which will be picked up by the eventloop and trigger the write.
-         // This is mainly needed as this method is triggered by different threads for the same channel.
-         // if we not do this we may produce out of order writes.
-         // NOTE:
-         // the submitted task does not effect in any way the current written size in the batch
-         // until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!!
-         // To solve it, will be necessary to manually perform the count of the current batch instead of rely on the
-         // Channel:Config::writeBufferHighWaterMark value.
-         this.pendingWritesOnEventLoop += readableBytes;
-         this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
-         eventLoop.execute(() -> {
-            this.pendingWritesOnEventLoop -= readableBytes;
-            this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
-            writeInEventLoop(buffer, flush, batched, futureListener);
-         });
-      }
-   }
-
-   private void writeNotInEventLoop(ActiveMQBuffer buffer,
-                                    final boolean flush,
-                                    final boolean batched,
-                                    final ChannelFutureListener futureListener) {
       final Channel channel = this.channel;
       final ChannelPromise promise;
       if (flush || (futureListener != null)) {
@@ -406,7 +368,6 @@ public class NettyConnection implements Connection {
       }
       final ChannelFuture future;
       final ByteBuf bytes = buffer.byteBuf();
-      final int readableBytes = bytes.readableBytes();
       assert readableBytes >= 0;
       final int writeBatchSize = this.batchLimit;
       final boolean batchingEnabled = this.batchingEnabled;
@@ -420,33 +381,17 @@ public class NettyConnection implements Connection {
       }
       if (flush) {
          //NOTE: this code path seems used only on RemotingConnection::disconnect
-         waitFor(promise, DEFAULT_WAIT_MILLIS);
+         flushAndWait(channel, promise);
       }
    }
 
-   private void writeInEventLoop(ActiveMQBuffer buffer,
-                                 final boolean flush,
-                                 final boolean batched,
-                                 final ChannelFutureListener futureListener) {
-      //no need to lock because the Netty's channel is thread-safe
-      //and the order of write is ensured by the order of the write calls
-      final ChannelPromise promise;
-      if (futureListener != null) {
-         promise = channel.newPromise();
-      } else {
-         promise = channel.voidPromise();
-      }
-      final ChannelFuture future;
-      final ByteBuf bytes = buffer.byteBuf();
-      final int readableBytes = bytes.readableBytes();
-      final int writeBatchSize = this.batchLimit;
-      if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
-         future = writeBatch(bytes, readableBytes, promise);
+   private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
+      if (!channel.eventLoop().inEventLoop()) {
+         waitFor(promise, DEFAULT_WAIT_MILLIS);
       } else {
-         future = channel.writeAndFlush(bytes, promise);
-      }
-      if (futureListener != null) {
-         future.addListener(futureListener);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Calling write with flush from a thread where it's not allowed");
+         }
       }
    }
 

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 02/04: ARTEMIS-2205 Refactor AMQP Processing into Netty Thread

nigrofranz
In reply to this post by nigrofranz
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d79762fa0489954d91e417f6110c213bc3d6db06
Author: Clebert Suconic <[hidden email]>
AuthorDate: Mon Dec 17 09:11:54 2018 -0500

    ARTEMIS-2205 Refactor AMQP Processing into Netty Thread
   
    These improvements were also part of this task:
    - Routing is now cached as much as possible.
    - A new Runnable is avoided for each individual message,
      since we use the Netty executor to perform delivery
   
    https://issues.apache.org/jira/browse/ARTEMIS-2205
---
 .../artemis/cli/commands/etc/artemis.profile       |   3 +
 .../artemis/api/core/TransportConfiguration.java   |  11 +
 .../org/apache/activemq/artemis/junit/Wait.java    |   7 +-
 .../amqp/broker/AMQPConnectionCallback.java        |  14 +-
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 165 ++++--------
 .../broker/ActiveMQProtonRemotingConnection.java   |   3 +-
 .../amqp/broker/ProtonProtocolManager.java         |  10 +
 .../amqp/proton/AMQPConnectionContext.java         | 228 ++++++++--------
 .../protocol/amqp/proton/AMQPSessionContext.java   |  40 ++-
 .../amqp/proton/ProtonServerReceiverContext.java   | 136 +++++-----
 .../amqp/proton/ProtonServerSenderContext.java     | 194 +++++++++-----
 .../amqp/proton/handler/ExecutorNettyAdapter.java  | 221 ++++++++++++++++
 .../amqp/proton/handler/ProtonHandler.java         | 287 +++++++++++----------
 .../transaction/ProtonTransactionHandler.java      |  14 +-
 .../proton/transaction/ProtonTransactionImpl.java  |  25 +-
 .../amqp/broker/AMQPSessionCallbackTest.java       |  83 +++---
 .../core/paging/cursor/PagedReferenceImpl.java     |  22 +-
 .../activemq/artemis/core/postoffice/Binding.java  |   4 +
 .../activemq/artemis/core/postoffice/Bindings.java |   3 +
 .../artemis/core/postoffice/impl/BindingsImpl.java | 145 +++++++----
 .../core/postoffice/impl/LocalQueueBinding.java    |   5 +
 .../core/postoffice/impl/PostOfficeImpl.java       | 161 ++++++------
 .../remoting/server/impl/RemotingServiceImpl.java  |   2 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   2 +-
 .../activemq/artemis/core/server/Consumer.java     |   8 +
 .../artemis/core/server/MessageReference.java      |   2 +
 .../core/server/MessageReferenceCallback.java      |  27 ++
 .../apache/activemq/artemis/core/server/Queue.java |   8 +
 .../artemis/core/server/RoutingContext.java        |  29 +++
 .../artemis/core/server/ServerConsumer.java        |   6 +-
 .../artemis/core/server/ServerSession.java         |  17 ++
 .../artemis/core/server/impl/LastValueQueue.java   |  11 +
 .../core/server/impl/MessageReferenceImpl.java     |  23 +-
 .../artemis/core/server/impl/QueueImpl.java        |  54 ++--
 .../core/server/impl/RoutingContextImpl.java       |  82 +++++-
 .../core/server/impl/ServerConsumerImpl.java       |  22 +-
 .../core/server/impl/ServerSessionImpl.java        |  41 ++-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   5 +
 .../integration/addressing/AddressingTest.java     |   2 +-
 .../integration/amqp/AmqpExpiredMessageTest.java   |   2 +-
 .../integration/amqp/AmqpFlowControlFailTest.java  |   4 +-
 .../integration/amqp/AmqpSendReceiveTest.java      |  49 ++++
 .../integration/amqp/AmqpTransactionTest.java      |   2 +-
 .../integration/amqp/JMSNonDestructiveTest.java    |   8 +-
 .../tests/integration/cli/DummyServerConsumer.java |   5 +
 .../tests/integration/client/ConsumerTest.java     |  10 +-
 .../tests/integration/client/HangConsumerTest.java |   5 +
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 +
 .../impl/WildcardAddressManagerUnitTest.java       |   5 +
 49 files changed, 1502 insertions(+), 715 deletions(-)

diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
index 876b4cb..af8aa86 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
@@ -43,3 +43,6 @@ JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -D
 
 # Debug args: Uncomment to enable debug
 #DEBUG_ARGS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
+
+# Debug args: Uncomment for async profiler
+#DEBUG_ARGS="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints"
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
index 7fcfcd5..ee285a3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
@@ -183,6 +183,17 @@ public class TransportConfiguration implements Serializable {
       return extraProps;
    }
 
+   public Map<String, Object> getCombinedParams() {
+      Map<String, Object> combined = new HashMap<>();
+      if (params != null) {
+         combined.putAll(params);
+      }
+      if (extraProps != null) {
+         combined.putAll(extraProps);
+      }
+      return combined;
+   }
+
    @Override
    public int hashCode() {
       int result = name != null ? name.hashCode() : 0;
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
index c0aa55d..5c817fb 100644
--- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
@@ -94,7 +94,12 @@ public class Wait {
 
 
    public static void assertTrue(String failureMessage, Condition condition) throws Exception {
-      boolean result = waitFor(condition);
+      assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
+   }
+
+   public static void assertTrue(String failureMessage, Condition condition, final long duration) throws Exception {
+
+      boolean result = waitFor(condition, duration);
 
       if (!result) {
          Assert.fail(failureMessage);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 84fdd24..d34ce80 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -73,7 +73,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
 
    protected AMQPConnectionContext amqpConnection;
 
-   private final Executor closeExecutor;
+   private final Executor sessionExecutor;
 
    private String remoteContainerId;
 
@@ -85,15 +85,19 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
 
    public AMQPConnectionCallback(ProtonProtocolManager manager,
                                  Connection connection,
-                                 Executor closeExecutor,
+                                 Executor sessionExecutor,
                                  ActiveMQServer server) {
       this.manager = manager;
       this.connection = connection;
-      this.closeExecutor = closeExecutor;
+      this.sessionExecutor = sessionExecutor;
       this.server = server;
       saslMechanisms = manager.getSaslMechanisms();
    }
 
+   public Connection getTransportConnection() {
+      return connection;
+   }
+
    public String[] getSaslMechanisms() {
       return saslMechanisms;
    }
@@ -213,7 +217,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
 
 
    public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
-      return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
+      return new AMQPSessionCallback(this, manager, connection, this.connection, sessionExecutor, server.newOperationContext());
    }
 
    public void sendSASLSupported() {
@@ -256,7 +260,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
    public Binary newTransaction() {
       XidImpl xid = newXID();
       Binary binary = new Binary(xid.getGlobalTransactionId());
-      Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
+      Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1, amqpConnection);
       transactions.put(binary, transaction);
       return binary;
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 1ca4410..0e2cf6d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -40,15 +37,14 @@ import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerProducer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
-import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -104,7 +100,8 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final Executor sessionExecutor;
 
-   private final AtomicBoolean draining = new AtomicBoolean(false);
+   private final boolean directDeliver;
+
 
    private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
@@ -125,6 +122,7 @@ public class AMQPSessionCallback implements SessionCallback {
       this.transportConnection = transportConnection;
       this.sessionExecutor = executor;
       this.operationContext = operationContext;
+      this.directDeliver = manager.isDirectDeliver();
    }
 
    @Override
@@ -133,28 +131,6 @@ public class AMQPSessionCallback implements SessionCallback {
       return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED;
    }
 
-   public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
-      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
-      if (drain) {
-         // If the draining is already running, then don't do anything
-         if (draining.compareAndSet(false, true)) {
-            final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
-            serverConsumer.forceDelivery(1, new Runnable() {
-               @Override
-               public void run() {
-                  try {
-                     plugSender.reportDrained();
-                  } finally {
-                     draining.set(false);
-                  }
-               }
-            });
-         }
-      } else {
-         serverConsumer.receiveCredits(-1);
-      }
-   }
-
    public void withinContext(RunnableEx run) throws Exception {
       OperationContext context = recoverContext();
       try {
@@ -180,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    @Override
    public boolean supportsDirectDelivery() {
-      return false;
+      return manager.isDirectDeliver();
    }
 
    public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
@@ -347,7 +323,6 @@ public class AMQPSessionCallback implements SessionCallback {
       return result;
    }
 
-
    public AddressQueryResult addressQuery(SimpleString addressName,
                                           RoutingType routingType,
                                           boolean autoCreate) throws Exception {
@@ -373,41 +348,8 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
    public void closeSender(final Object brokerConsumer) throws Exception {
-
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      Runnable runnable = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               consumer.close(false);
-               latch.countDown();
-            } catch (Exception e) {
-            }
-         }
-      };
-
-      // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
-      // to avoid deadlocks the close has to be done outside of the main thread on an executor
-      // otherwise you could get a deadlock
-      Executor executor = protonSPI.getExeuctor();
-
-      if (executor != null) {
-         executor.execute(runnable);
-      } else {
-         runnable.run();
-      }
-
-      try {
-         // a short timeout will do.. 1 second is already long enough
-         if (!latch.await(1, TimeUnit.SECONDS)) {
-            logger.debug("Could not close consumer on time");
-         }
-      } catch (InterruptedException e) {
-         throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
-      }
-
+      consumer.close(false);
       consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
    }
 
@@ -418,12 +360,19 @@ public class AMQPSessionCallback implements SessionCallback {
    public void close() throws Exception {
       //need to check here as this can be called if init fails
       if (serverSession != null) {
-         OperationContext context = recoverContext();
-         try {
-            serverSession.close(false);
-         } finally {
-            resetContext(context);
-         }
+         // we cannot hold the nettyExecutor on this rollback here, otherwise other connections will be waiting
+         sessionExecutor.execute(() -> {
+            OperationContext context = recoverContext();
+            try {
+               try {
+                  serverSession.close(false);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            } finally {
+               resetContext(context);
+            }
+         });
       }
    }
 
@@ -468,7 +417,8 @@ public class AMQPSessionCallback implements SessionCallback {
                           final Delivery delivery,
                           SimpleString address,
                           int messageFormat,
-                          ReadableBuffer data) throws Exception {
+                          ReadableBuffer data,
+                          RoutingContext routingContext) throws Exception {
       AMQPMessage message = new AMQPMessage(messageFormat, data, null, coreMessageObjectPools);
       if (address != null) {
          message.setAddress(address);
@@ -503,7 +453,7 @@ public class AMQPSessionCallback implements SessionCallback {
                rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
             }
          } else {
-            serverSend(transaction, message, delivery, receiver);
+            serverSend(context, transaction, message, delivery, receiver, routingContext);
          }
       } finally {
          resetContext(oldcontext);
@@ -520,14 +470,11 @@ public class AMQPSessionCallback implements SessionCallback {
       afterIO(new IOCallback() {
          @Override
          public void done() {
-            connection.lock();
-            try {
+            connection.runLater(() -> {
                delivery.disposition(rejected);
                delivery.settle();
-            } finally {
-               connection.unlock();
-            }
-            connection.flush();
+               connection.flush();
+            });
          }
 
          @Override
@@ -538,19 +485,20 @@ public class AMQPSessionCallback implements SessionCallback {
 
    }
 
-   private void serverSend(final Transaction transaction,
+   private void serverSend(final ProtonServerReceiverContext context,
+                           final Transaction transaction,
                            final Message message,
                            final Delivery delivery,
-                           final Receiver receiver) throws Exception {
+                           final Receiver receiver,
+                           final RoutingContext routingContext) throws Exception {
       message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
       invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
-      serverSession.send(transaction, message, false, false);
+      serverSession.send(transaction, message, directDeliver, false, routingContext);
 
       afterIO(new IOCallback() {
          @Override
          public void done() {
-            connection.lock();
-            try {
+            connection.runLater(() -> {
                if (delivery.getRemoteState() instanceof TransactionalState) {
                   TransactionalState txAccepted = new TransactionalState();
                   txAccepted.setOutcome(Accepted.getInstance());
@@ -561,21 +509,17 @@ public class AMQPSessionCallback implements SessionCallback {
                   delivery.disposition(Accepted.getInstance());
                }
                delivery.settle();
-            } finally {
-               connection.unlock();
-            }
-            connection.flush();
+               context.flow();
+               connection.flush();
+            });
          }
 
          @Override
          public void onError(int errorCode, String errorMessage) {
-            connection.lock();
-            try {
+            connection.runNow(() -> {
                receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
                connection.flush();
-            } finally {
-               connection.unlock();
-            }
+            });
          }
       });
    }
@@ -635,15 +579,12 @@ public class AMQPSessionCallback implements SessionCallback {
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
 
       try {
-         return plugSender.deliverMessage(ref, deliveryCount, transportConnection);
+         return plugSender.deliverMessage(ref, consumer);
       } catch (Exception e) {
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
             connection.flush();
-         } finally {
-            connection.unlock();
-         }
+         });
          throw new IllegalStateException("Can't deliver message " + e, e);
       }
 
@@ -673,23 +614,22 @@ public class AMQPSessionCallback implements SessionCallback {
    @Override
    public void disconnect(ServerConsumer consumer, SimpleString queueName) {
       ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
-      connection.lock();
-      try {
-         ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
-         connection.flush();
-      } catch (ActiveMQAMQPException e) {
-         logger.error("Error closing link for " + consumer.getQueue().getAddress());
-      } finally {
-         connection.unlock();
-      }
+      connection.runNow(() -> {
+         try {
+            ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
+            connection.flush();
+         } catch (ActiveMQAMQPException e) {
+            logger.error("Error closing link for " + consumer.getQueue().getAddress());
+         }
+      });
    }
 
    @Override
    public boolean hasCredits(ServerConsumer consumer) {
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
 
-      if (plugSender != null && plugSender.getSender().getCredit() > 0) {
-         return true;
+      if (plugSender != null) {
+         return plugSender.hasCredits();
       } else {
          return false;
       }
@@ -757,6 +697,10 @@ public class AMQPSessionCallback implements SessionCallback {
       this.transactionHandler = transactionHandler;
    }
 
+   public Connection getTransportConnection() {
+      return transportConnection;
+   }
+
    public ProtonTransactionHandler getTransactionHandler() {
       return this.transactionHandler;
    }
@@ -782,4 +726,7 @@ public class AMQPSessionCallback implements SessionCallback {
       }
 
    }
+   interface CreditRunnable extends Runnable {
+      boolean isRun();
+   }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 41f6e78..a06765d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -122,7 +122,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
       ErrorCondition errorCondition = new ErrorCondition();
       errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
       amqpConnection.close(errorCondition);
-      getTransportConnection().close();
+      // There's no need to flush, amqpConnection.close() is calling flush
+      // as long this semantic is kept no need to flush here
    }
 
    /**
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index d86dc81..5b9aa38 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -77,6 +77,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
 
    private Long amqpIdleTimeout;
 
+   private boolean directDeliver = true;
+
 
    /*
    * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
@@ -131,6 +133,14 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
       return this;
    }
 
+   public boolean isDirectDeliver() {
+      return directDeliver;
+   }
+
+   public ProtonProtocolManager setDirectDeliver(boolean directDeliver) {
+      this.directDeliver = directDeliver;
+      return this;
+   }
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 3552106..07b2875 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
-
 import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,12 +25,16 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoop;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExecutorNettyAdapter;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
@@ -59,7 +57,11 @@ import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
 
 public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
 
@@ -111,7 +113,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
-      this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
+      EventLoop nettyExecutor;
+      if (connectionCallback.getTransportConnection() instanceof NettyConnection) {
+         nettyExecutor = ((NettyConnection) connectionCallback.getTransportConnection()).getNettyChannel().eventLoop();
+      } else {
+         nettyExecutor = new ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor());
+      }
+      this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
       handler.addEventHandler(this);
       Transport transport = handler.getTransport();
       transport.setEmitFlowEventOnSend(false);
@@ -127,6 +135,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       }
    }
 
+   public void requireInHandler() {
+      handler.requireHandler();
+   }
+
    public void scheduledFlush() {
       handler.scheduledFlush();
    }
@@ -159,35 +171,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
    }
 
    public void destroy() {
-      connectionCallback.close();
+      handler.runLater(() -> connectionCallback.close());
    }
 
    public boolean isSyncOnFlush() {
       return false;
    }
 
-   public boolean tryLock(long time, TimeUnit timeUnit) {
-      return handler.tryLock(time, timeUnit);
-   }
-
-   public void lock() {
-      handler.lock();
-   }
-
-   public void unlock() {
-      handler.unlock();
-   }
-
-   public int capacity() {
-      return handler.capacity();
-   }
-
    public void flush() {
       handler.flush();
    }
 
    public void close(ErrorCondition errorCondition) {
-      handler.close(errorCondition);
+      handler.close(errorCondition, this);
    }
 
    protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
@@ -201,6 +197,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       return sessionExtension;
    }
 
+   public void runOnPool(Runnable run) {
+      handler.runOnPool(run);
+   }
+
+   public void runNow(Runnable run) {
+      handler.runNow(run);
+   }
+
+   public void runLater(Runnable run) {
+      handler.runLater(run);
+   }
+
    protected boolean validateConnection(Connection connection) {
       return connectionCallback.validateConnection(connection, handler.getSASLResult());
    }
@@ -224,6 +232,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
    protected void initInternal() throws Exception {
    }
 
+   public AMQPConnectionCallback getConnectionCallback() {
+      return connectionCallback;
+   }
+
    protected void remoteLinkOpened(Link link) throws Exception {
 
       AMQPSessionContext protonSession = getSessionExtension(link.getSession());
@@ -314,7 +326,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
          if (!connectionCallback.isSupportsAnonymous()) {
             connectionCallback.sendSASLSupported();
             connectionCallback.close();
-            handler.close(null);
+            handler.close(null, this);
          }
       }
    }
@@ -334,7 +346,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
    @Override
    public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) {
       connectionCallback.close();
-      handler.close(null);
+      handler.close(null, this);
    }
 
    @Override
@@ -359,59 +371,73 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteOpen(Connection connection) throws Exception {
-      lock();
+      handler.requireHandler();
       try {
-         try {
-            initInternal();
-         } catch (Exception e) {
-            log.error("Error init connection", e);
-         }
-         if (!validateConnection(connection)) {
-            connection.close();
-         } else {
-            connection.setContext(AMQPConnectionContext.this);
-            connection.setContainer(containerId);
-            connection.setProperties(connectionProperties);
-            connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-            connection.open();
-         }
-      } finally {
-         unlock();
+         initInternal();
+      } catch (Exception e) {
+         log.error("Error init connection", e);
+      }
+      if (!validateConnection(connection)) {
+         connection.close();
+      } else {
+         connection.setContext(AMQPConnectionContext.this);
+         connection.setContainer(containerId);
+         connection.setProperties(connectionProperties);
+         connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+         connection.open();
       }
       initialise();
 
-         /*
-         * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
-         * but its here in case we add support for outbound connections.
-         * */
+      /*
+      * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
+      * but its here in case we add support for outbound connections.
+      * */
       if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
          long nextKeepAliveTime = handler.tick(true);
          if (nextKeepAliveTime != 0 && scheduledPool != null) {
-            scheduledPool.schedule(new Runnable() {
-               @Override
-               public void run() {
-                  Long rescheduleAt = handler.tick(false);
-                  if (rescheduleAt == null) {
-                     // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
-                     scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
-                  } else if (rescheduleAt != 0) {
-                     scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
-                  }
-               }
-            }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+            scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+         }
+      }
+   }
+
+   class TickerRunnable implements Runnable {
+
+      final ScheduleRunnable scheduleRunnable;
+
+      TickerRunnable(ScheduleRunnable scheduleRunnable) {
+         this.scheduleRunnable = scheduleRunnable;
+      }
+
+      @Override
+      public void run() {
+         Long rescheduleAt = handler.tick(false);
+         if (rescheduleAt == null) {
+            // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
+            scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS);
+         } else if (rescheduleAt != 0) {
+            scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
          }
       }
    }
 
+   class ScheduleRunnable implements Runnable {
+
+      TickerRunnable tickerRunnable = new TickerRunnable(this);
+
+      @Override
+      public void run() {
+
+         // The actual tick has to happen within a Netty Worker, to avoid requiring a lock
+         // this will also be used to flush the data directly into netty connection's executor
+         handler.runLater(tickerRunnable);
+      }
+   }
+
    @Override
    public void onRemoteClose(Connection connection) {
-      lock();
-      try {
-         connection.close();
-         connection.free();
-      } finally {
-         unlock();
-      }
+      handler.requireHandler();
+      connection.close();
+      connection.free();
 
       for (AMQPSessionContext protonSession : sessions.values()) {
          protonSession.close();
@@ -430,31 +456,24 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteOpen(Session session) throws Exception {
+      handler.requireHandler();
       getSessionExtension(session).initialise();
-      lock();
-      try {
-         session.open();
-      } finally {
-         unlock();
-      }
+      session.open();
    }
 
    @Override
    public void onRemoteClose(Session session) throws Exception {
-      lock();
-      try {
+      handler.runLater(() -> {
          session.close();
          session.free();
-      } finally {
-         unlock();
-      }
 
-      AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
-      if (sessionContext != null) {
-         sessionContext.close();
-         sessions.remove(session);
-         session.setContext(null);
-      }
+         AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
+         if (sessionContext != null) {
+            sessionContext.close();
+            sessions.remove(session);
+            session.setContext(null);
+         }
+      });
    }
 
    @Override
@@ -471,40 +490,42 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onRemoteClose(Link link) throws Exception {
-      lock();
-      try {
+      handler.requireHandler();
+
+      // We scheduled it for later, as that will work through anything that's pending on the current deliveries.
+      runNow(() -> {
          link.close();
          link.free();
-      } finally {
-         unlock();
-      }
 
-      ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
-      if (linkContext != null) {
-         linkContext.close(true);
-      }
+         ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
+         if (linkContext != null) {
+            try {
+               linkContext.close(true);
+            } catch (Exception e) {
+               log.error(e.getMessage(), e);
+            }
+         }
+         flush();
+
+      });
    }
 
    @Override
    public void onRemoteDetach(Link link) throws Exception {
-      boolean handleAsClose = link.getSource() != null
-                              && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH;
+      handler.requireHandler();
+      boolean handleAsClose = link.getSource() != null && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH;
 
       if (handleAsClose) {
          onRemoteClose(link);
       } else {
-         lock();
-         try {
-            link.detach();
-            link.free();
-         } finally {
-            unlock();
-         }
+         link.detach();
+         link.free();
       }
    }
 
    @Override
    public void onLocalDetach(Link link) throws Exception {
+      handler.requireHandler();
       Object context = link.getContext();
       if (context instanceof ProtonServerSenderContext) {
          ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
@@ -514,6 +535,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
    @Override
    public void onDelivery(Delivery delivery) throws Exception {
+      handler.requireHandler();
       ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
       if (handler != null) {
          handler.onMessage(delivery);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 5cd3515..c8bb13e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -150,13 +150,11 @@ public class AMQPSessionContext extends ProtonInitializable {
       coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
 
       receiver.setContext(transactionHandler);
-      connection.lock();
-      try {
+      connection.runNow(() -> {
          receiver.open();
          receiver.flow(connection.getAmqpCredits());
-      } finally {
-         connection.unlock();
-      }
+         connection.flush();
+      });
    }
 
    public void addSender(Sender sender) throws Exception {
@@ -169,24 +167,20 @@ public class AMQPSessionContext extends ProtonInitializable {
          senders.put(sender, protonSender);
          serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
          sender.setContext(protonSender);
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             sender.open();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
 
          protonSender.start();
       } catch (ActiveMQAMQPException e) {
          senders.remove(sender);
          sender.setSource(null);
          sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             sender.close();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
       }
    }
 
@@ -206,22 +200,18 @@ public class AMQPSessionContext extends ProtonInitializable {
          ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
          sessionSPI.addProducer(serverProducer);
          receiver.setContext(protonReceiver);
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             receiver.open();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
       } catch (ActiveMQAMQPException e) {
          receivers.remove(receiver);
          receiver.setTarget(null);
          receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             receiver.close();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
       }
    }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index b0cfba0..1446373 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -25,7 +25,9 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -49,6 +51,9 @@ import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
+/**
+ * This is the equivalent for the ServerProducer
+ */
 public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
 
    private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
@@ -63,35 +68,43 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    protected final AMQPSessionCallback sessionSPI;
 
-   /** We create this AtomicRunnable with setRan.
-    *  This is because we always reuse the same instance.
-    *  In case the creditRunnable was run, we reset and send it over.
-    *  We set it as ran as the first one should always go through */
-   protected final AtomicRunnable creditRunnable;
+   RoutingContext routingContext = new RoutingContextImpl(null);
 
+   /**
+    * We create this AtomicRunnable with setRan.
+    * This is because we always reuse the same instance.
+    * In case the creditRunnable was run, we reset and send it over.
+    * We set it as ran as the first one should always go through
+    */
+   protected final AtomicRunnable creditRunnable;
 
-   /** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */
-   public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
+   /**
+    * This Credit Runnable may be used in Mock tests to simulate the credit semantic here
+    */
+   public static AtomicRunnable createCreditRunnable(int refill,
+                                                     int threshold,
+                                                     Receiver receiver,
+                                                     AMQPConnectionContext connection) {
+      Runnable creditRunnable = () -> {
+
+         connection.requireInHandler();
+         if (receiver.getCredit() <= threshold) {
+            int topUp = refill - receiver.getCredit();
+            if (topUp > 0) {
+               // System.out.println("Sending " + topUp + " towards client");
+               receiver.flow(topUp);
+               connection.flush();
+            }
+         }
+      };
       return new AtomicRunnable() {
          @Override
          public void atomicRun() {
-            connection.lock();
-            try {
-               if (receiver.getCredit() <= threshold) {
-                  int topUp = refill - receiver.getCredit();
-                  if (topUp > 0) {
-                     receiver.flow(topUp);
-                  }
-               }
-            } finally {
-               connection.unlock();
-            }
-            connection.flush();
+            connection.runNow(creditRunnable);
          }
       };
    }
 
-
    /*
     The maximum number of credits we will allocate to clients.
     This number is also used by the broker when refresh client credits.
@@ -249,41 +262,46 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
     */
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      try {
-         Receiver receiver = ((Receiver) delivery.getLink());
-
-         if (receiver.current() != delivery) {
-            return;
-         }
+      connection.requireInHandler();
+      Receiver receiver = ((Receiver) delivery.getLink());
 
-         if (delivery.isAborted()) {
-            // Aborting implicitly remotely settles, so advance
-            // receiver to the next delivery and settle locally.
-            receiver.advance();
-            delivery.settle();
+      if (receiver.current() != delivery) {
+         return;
+      }
 
-            // Replenish the credit if not doing a drain
-            if (!receiver.getDrain()) {
-               receiver.flow(1);
-            }
+      if (delivery.isAborted()) {
+         // Aborting implicitly remotely settles, so advance
+         // receiver to the next delivery and settle locally.
+         receiver.advance();
+         delivery.settle();
 
-            return;
-         } else if (delivery.isPartial()) {
-            return;
+         // Replenish the credit if not doing a drain
+         if (!receiver.getDrain()) {
+            receiver.flow(1);
          }
 
-         Transaction tx = null;
-         ReadableBuffer data = receiver.recv();
-         receiver.advance();
+         return;
+      } else if (delivery.isPartial()) {
+         return;
+      }
 
-         if (delivery.getRemoteState() instanceof TransactionalState) {
-            TransactionalState txState = (TransactionalState) delivery.getRemoteState();
-            tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
-         }
+      ReadableBuffer data = receiver.recv();
+      receiver.advance();
+      Transaction tx = null;
+
+      if (delivery.getRemoteState() instanceof TransactionalState) {
+         TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+         tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
+      }
 
-         sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
+      final Transaction txUsed = tx;
 
-         flow();
+      actualDelivery(delivery, receiver, data, txUsed);
+   }
+
+   private void actualDelivery(Delivery delivery, Receiver receiver, ReadableBuffer data, Transaction tx) {
+      try {
+         sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext);
       } catch (Exception e) {
          log.warn(e.getMessage(), e);
          Rejected rejected = new Rejected();
@@ -294,13 +312,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          } else {
             condition.setCondition(Symbol.valueOf("failed"));
          }
+         connection.runLater(() -> {
 
-         condition.setDescription(e.getMessage());
-         rejected.setError(condition);
+            condition.setDescription(e.getMessage());
+            rejected.setError(condition);
+
+            delivery.disposition(rejected);
+            delivery.settle();
+            flow();
+            connection.flush();
+         });
 
-         delivery.disposition(rejected);
-         delivery.settle();
-         flow();
       }
    }
 
@@ -324,6 +346,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    }
 
    public void flow() {
+      connection.requireInHandler();
       if (!creditRunnable.isRun()) {
          return; // nothing to be done as the previous one did not run yet
       }
@@ -339,13 +362,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    }
 
    public void drain(int credits) {
-      connection.lock();
-      try {
+      connection.runNow(() -> {
          receiver.drain(credits);
-      } finally {
-         connection.unlock();
-      }
-      connection.flush();
+         connection.flush();
+      });
    }
 
    public int drained() {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index c4aca48..843d1fe 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -20,7 +20,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -32,7 +33,10 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -49,7 +53,6 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -74,9 +77,9 @@ import org.apache.qpid.proton.engine.Sender;
 import org.jboss.logging.Logger;
 
 /**
- * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
+ * This is the Equivalent for the ServerConsumer
  */
-public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
+public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback {
 
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
 
@@ -104,6 +107,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private boolean isVolatile = false;
    private boolean preSettle;
    private SimpleString tempQueueName;
+   private final AtomicBoolean draining = new AtomicBoolean(false);
+
+   private int credits = 0;
+
+   private AtomicInteger pending = new AtomicInteger(0);
+   /**
+    * The model proton uses requires us to hold a lock in certain times
+    * to sync the credits we have versus the credits that are being held in proton
+    * */
+   private final Object creditsLock = new Object();
 
    public ProtonServerSenderContext(AMQPConnectionContext connection,
                                     Sender sender,
@@ -122,7 +135,51 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
    @Override
    public void onFlow(int currentCredits, boolean drain) {
-      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
+      connection.requireInHandler();
+
+      setupCredit();
+
+      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) brokerConsumer;
+      if (drain) {
+         // If the draining is already running, then don't do anything
+         if (draining.compareAndSet(false, true)) {
+            final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
+            serverConsumer.forceDelivery(1, new Runnable() {
+               @Override
+               public void run() {
+                  try {
+                     connection.runNow(() -> {
+                        plugSender.reportDrained();
+                        setupCredit();
+                     });
+                  } finally {
+                     draining.set(false);
+                  }
+               }
+            });
+         }
+      } else {
+         serverConsumer.receiveCredits(-1);
+      }
+   }
+
+   public boolean hasCredits() {
+      if (!connection.flowControl(brokerConsumer::promptDelivery)) {
+         return false;
+      }
+
+      synchronized (creditsLock) {
+         return credits > 0 && sender.getLocalState() != EndpointState.CLOSED;
+      }
+   }
+
+   private void setupCredit() {
+      synchronized (creditsLock) {
+         this.credits = sender.getCredit() - pending.get();
+         if (credits < 0) {
+            credits = 0;
+         }
+      }
    }
 
    public Sender getSender() {
@@ -469,20 +526,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          sender.setCondition(condition);
       }
       protonSession.removeSender(sender);
-      connection.lock();
-      try {
-         sender.close();
-      } finally {
-         connection.unlock();
-      }
-      connection.flush();
 
-      try {
-         sessionSPI.closeSender(brokerConsumer);
-      } catch (Exception e) {
-         log.warn(e.getMessage(), e);
-         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
-      }
+      connection.runLater(() -> {
+         sender.close();
+         try {
+            sessionSPI.closeSender(brokerConsumer);
+         } catch (Exception e) {
+            log.warn(e.getMessage(), e);
+         }
+         sender.close();
+         connection.flush();
+      });
    }
 
    /*
@@ -666,12 +720,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    }
 
    public void settle(Delivery delivery) {
-      connection.lock();
-      try {
-         delivery.settle();
-      } finally {
-         connection.unlock();
-      }
+      connection.requireInHandler();
+      delivery.settle();
    }
 
    public synchronized void checkState() {
@@ -681,42 +731,59 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    /**
     * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
     */
-   public int deliverMessage(MessageReference messageReference, int deliveryCount, Connection transportConnection) throws Exception {
+   public int deliverMessage(final MessageReference messageReference, final ServerConsumer consumer) throws Exception {
 
       if (closed) {
          return 0;
       }
 
-      AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
-      sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
+      try {
+         synchronized (creditsLock) {
+            if (sender.getLocalState() == EndpointState.CLOSED) {
+               return 0;
+            }
+            pending.incrementAndGet();
+            credits--;
+         }
 
-      // we only need a tag if we are going to settle later
-      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
+         if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
+            messageReference.setCallback(this);
+            connection.runNow((Runnable)messageReference);
+         } else {
+            connection.runNow(() -> executeDelivery(messageReference));
+         }
 
-      // Let the Message decide how to present the message bytes
-      ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
-      boolean releaseRequired = sendBuffer instanceof NettyReadable;
+         // This is because on AMQP we only send messages based in credits, not bytes
+         return 1;
+      } finally {
 
-      try {
-         int size = sendBuffer.remaining();
+      }
+   }
 
-         while (!connection.tryLock(1, TimeUnit.SECONDS)) {
-            if (closed || sender.getLocalState() == EndpointState.CLOSED) {
-               // If we're waiting on the connection lock, the link might be in the process of closing.  If this happens
-               // we return.
-               return 0;
-            } else {
-               if (log.isDebugEnabled()) {
-                  log.debug("Couldn't get lock on deliverMessage " + this);
-               }
-            }
+   @Override
+   public void executeDelivery(MessageReference messageReference) {
+
+      try {
+         if (sender.getLocalState() == EndpointState.CLOSED) {
+            log.debug("Not delivering message " + messageReference + " as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times");
+            return;
          }
+         AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
+
+         sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection());
+
+         // Let the Message decide how to present the message bytes
+         ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount());
+         // we only need a tag if we are going to settle later
+         byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
+
+         boolean releaseRequired = sendBuffer instanceof NettyReadable;
+         final Delivery delivery;
+         delivery = sender.delivery(tag, 0, tag.length);
+         delivery.setMessageFormat((int) message.getMessageFormat());
+         delivery.setContext(messageReference);
 
          try {
-            final Delivery delivery;
-            delivery = sender.delivery(tag, 0, tag.length);
-            delivery.setMessageFormat((int) message.getMessageFormat());
-            delivery.setContext(messageReference);
 
             if (releaseRequired) {
                sender.send(sendBuffer);
@@ -730,7 +797,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
             if (preSettle) {
                // Presettled means the client implicitly accepts any delivery we send it.
-               sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
+               try {
+                  sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
+               } catch (Exception e) {
+                  log.debug(e.getMessage(), e);
+               }
                delivery.settle();
             } else {
                sender.advance();
@@ -738,14 +809,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
             connection.flush();
          } finally {
-            connection.unlock();
-         }
-
-         return size;
-      } finally {
-         if (releaseRequired) {
-            ((NettyReadable) sendBuffer).getByteBuf().release();
+            synchronized (creditsLock) {
+               pending.decrementAndGet();
+            }
+            if (releaseRequired) {
+               ((NettyReadable) sendBuffer).getByteBuf().release();
+            }
          }
+      } catch (Exception e) {
+         log.warn(e.getMessage(), e);
+         brokerConsumer.errorProcessing(e, messageReference);
       }
    }
 
@@ -806,13 +879,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
     * Update link state to reflect that the previous drain attempt has completed.
     */
    public void reportDrained() {
-      connection.lock();
-      try {
-         sender.drained();
-      } finally {
-         connection.unlock();
-      }
-
+      connection.requireInHandler();
+      sender.drained();
       connection.flush();
    }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
new file mode 100644
index 0000000..9d0f09e
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty Executor
+ *  On that case this is a simple adapter for what's needed from these tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high cost.
+ *        We may do it some day if we find an easy way that won't clutter the code too much.
+ *  */
+public class ExecutorNettyAdapter implements EventLoop {
+
+   final ArtemisExecutor executor;
+
+   public ExecutorNettyAdapter(ArtemisExecutor executor) {
+      this.executor = executor;
+   }
+
+   @Override
+   public EventLoopGroup parent() {
+      return null;
+   }
+
+   @Override
+   public EventLoop next() {
+      return null;
+   }
+
+   @Override
+   public ChannelFuture register(Channel channel) {
+      return null;
+   }
+
+   @Override
+   public ChannelFuture register(ChannelPromise promise) {
+      return null;
+   }
+
+   @Override
+   public ChannelFuture register(Channel channel, ChannelPromise promise) {
+      return null;
+   }
+
+   @Override
+   public boolean inEventLoop() {
+      return inEventLoop(Thread.currentThread());
+   }
+
+   @Override
+   public boolean inEventLoop(Thread thread) {
+      return false;
+   }
+
+   @Override
+   public <V> Promise<V> newPromise() {
+      return null;
+   }
+
+   @Override
+   public <V> ProgressivePromise<V> newProgressivePromise() {
+      return null;
+   }
+
+   @Override
+   public <V> Future<V> newSucceededFuture(V result) {
+      return null;
+   }
+
+   @Override
+   public <V> Future<V> newFailedFuture(Throwable cause) {
+      return null;
+   }
+
+   @Override
+   public boolean isShuttingDown() {
+      return false;
+   }
+
+   @Override
+   public Future<?> shutdownGracefully() {
+      return null;
+   }
+
+   @Override
+   public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public Future<?> terminationFuture() {
+      return null;
+   }
+
+   @Override
+   public void shutdown() {
+
+   }
+
+   @Override
+   public List<Runnable> shutdownNow() {
+      return null;
+   }
+
+   @Override
+   public Iterator<EventExecutor> iterator() {
+      return null;
+   }
+
+   @Override
+   public Future<?> submit(Runnable task) {
+      execute(task);
+      return null;
+   }
+
+   @Override
+   public <T> Future<T> submit(Runnable task, T result) {
+      execute(task);
+      return null;
+   }
+
+   @Override
+   public <T> Future<T> submit(Callable<T> task) {
+      return null;
+   }
+
+   @Override
+   public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public boolean isShutdown() {
+      return false;
+   }
+
+   @Override
+   public boolean isTerminated() {
+      return false;
+   }
+
+   @Override
+   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      return false;
+   }
+
+   @Override
+   public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+      return null;
+   }
+
+   @Override
+   public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+                                                             long timeout,
+                                                             TimeUnit unit) throws InterruptedException {
+      return null;
+   }
+
+   @Override
+   public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+      return null;
+   }
+
+   @Override
+   public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+                          long timeout,
+                          TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return null;
+   }
+
+   @Override
+   public void execute(Runnable command) {
+      executor.execute(command);
+   }
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 694c1d3..2f730fb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -16,22 +16,24 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 
+import javax.security.auth.Subject;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.security.auth.Subject;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.EventLoop;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
@@ -46,9 +48,6 @@ import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.TransportInternal;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 public class ProtonHandler extends ProtonInitializable implements SaslListener {
 
    private static final Logger log = Logger.getLogger(ProtonHandler.class);
@@ -68,8 +67,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
    private ServerSASL chosenMechanism;
    private ClientSASL clientSASLMechanism;
 
-   private final ReentrantLock lock = new ReentrantLock();
-
    private final long creationTime;
 
    private final boolean isServer;
@@ -80,17 +77,20 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
 
    protected boolean receivedFirstPacket = false;
 
-   private final Executor flushExecutor;
+   private final EventLoop workerExecutor;
+
+   private final ArtemisExecutor poolExecutor;
 
    protected final ReadyListener readyListener;
 
    boolean inDispatch = false;
 
-   public ProtonHandler(Executor flushExecutor, boolean isServer) {
-      this.flushExecutor = flushExecutor;
-      this.readyListener = () -> this.flushExecutor.execute(() -> {
-         flush();
-      });
+   boolean scheduledFlush = false;
+
+   public ProtonHandler(EventLoop workerExecutor, ArtemisExecutor poolExecutor, boolean isServer) {
+      this.workerExecutor = workerExecutor;
+      this.poolExecutor = poolExecutor;
+      this.readyListener = () -> runLater(this::flush);
       this.creationTime = System.currentTimeMillis();
       this.isServer = isServer;
 
@@ -106,45 +106,33 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
    }
 
    public Long tick(boolean firstTick) {
-      if (firstTick) {
-         // the first tick needs to guarantee a lock here
-         lock.lock();
-      } else {
-         if (!lock.tryLock()) {
-            log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly");
-            // if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here
-            return null;
-         }
-      }
-      try {
-         if (!firstTick) {
-            try {
-               if (connection.getLocalState() != EndpointState.CLOSED) {
-                  long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-                  if (transport.isClosed()) {
-                     throw new IllegalStateException("Channel was inactive for to long");
-                  }
-                  return rescheduleAt;
+      requireHandler();
+      if (!firstTick) {
+         try {
+            if (connection.getLocalState() != EndpointState.CLOSED) {
+               long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+               if (transport.isClosed()) {
+                  throw new IllegalStateException("Channel was inactive for to long");
                }
-            } catch (Exception e) {
-               log.warn(e.getMessage(), e);
-               transport.close();
-               connection.setCondition(new ErrorCondition());
+               return rescheduleAt;
             }
-            return 0L;
+         } catch (Exception e) {
+            log.warn(e.getMessage(), e);
+            transport.close();
+            connection.setCondition(new ErrorCondition());
+         } finally {
+            flush();
          }
-         return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-      } finally {
-         lock.unlock();
-         flushBytes();
+         return 0L;
       }
+      return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
    }
 
    /**
     * We cannot flush until the initial handshake was finished.
     * If this happens before the handshake, the connection response will happen without SASL
     * and the client will respond and fail with an invalid code.
-    * */
+    */
    public void scheduledFlush() {
       if (receivedFirstPacket) {
          flush();
@@ -152,29 +140,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
    }
 
    public int capacity() {
-      lock.lock();
-      try {
-         return transport.capacity();
-      } finally {
-         lock.unlock();
-      }
-   }
-
-   public void lock() {
-      lock.lock();
+      requireHandler();
+      return transport.capacity();
    }
 
-   public void unlock() {
-      lock.unlock();
-   }
-
-   public boolean tryLock(long time, TimeUnit timeUnit) {
-      try {
-         return lock.tryLock(time, timeUnit);
-      } catch (InterruptedException e) {
-
-         Thread.currentThread().interrupt();
-         return false;
+   public void requireHandler() {
+      if (!workerExecutor.inEventLoop()) {
+         new Exception("saco!!!").printStackTrace();
+         // this should not happen unless there is an obvious programming error
+         log.warn("Using inHandler is required", new Exception("trace"));
+         System.exit(-1);
+         throw new IllegalStateException("this method requires to be called within the handler, use the executor");
       }
    }
 
@@ -192,21 +168,34 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
    }
 
    public void createServerSASL(String[] mechanisms) {
+      requireHandler();
       Sasl sasl = transport.sasl();
       sasl.server();
       sasl.setMechanisms(mechanisms);
       sasl.setListener(this);
    }
 
+
+
    public void flushBytes() {
+      requireHandler();
+
+      if (!scheduledFlush) {
+         scheduledFlush = true;
+         workerExecutor.execute(this::actualFlush);
+      }
+   }
+
+   private void actualFlush() {
+      requireHandler();
 
       for (EventHandler handler : handlers) {
          if (!handler.flowControl(readyListener)) {
+            scheduledFlush = false;
             return;
          }
       }
 
-      lock.lock();
       try {
          while (true) {
             ByteBuffer head = transport.head();
@@ -227,7 +216,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
             transport.pop(pending);
          }
       } finally {
-         lock.unlock();
+         scheduledFlush = false;
       }
    }
 
@@ -236,36 +225,32 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
    }
 
    public void inputBuffer(ByteBuf buffer) {
+      requireHandler();
       dataReceived = true;
-      lock.lock();
-      try {
-         while (buffer.readableBytes() > 0) {
-            int capacity = transport.capacity();
+      while (buffer.readableBytes() > 0) {
+         int capacity = transport.capacity();
 
-            if (!receivedFirstPacket) {
-               handleFirstPacket(buffer);
-               // there is a chance that if SASL Handshake has been carried out that the capacity may change.
-               capacity = transport.capacity();
-            }
+         if (!receivedFirstPacket) {
+            handleFirstPacket(buffer);
+            // there is a chance that if SASL Handshake has been carried out that the capacity may change.
+            capacity = transport.capacity();
+         }
 
-            if (capacity > 0) {
-               ByteBuffer tail = transport.tail();
-               int min = Math.min(capacity, buffer.readableBytes());
-               tail.limit(min);
-               buffer.readBytes(tail);
+         if (capacity > 0) {
+            ByteBuffer tail = transport.tail();
+            int min = Math.min(capacity, buffer.readableBytes());
+            tail.limit(min);
+            buffer.readBytes(tail);
 
-               flush();
+            flush();
+         } else {
+            if (capacity == 0) {
+               log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
             } else {
-               if (capacity == 0) {
-                  log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
-               } else {
-                  log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
-               }
-               break;
+               log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
             }
+            break;
          }
-      } finally {
-         lock.unlock();
       }
    }
 
@@ -281,29 +266,55 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
       return creationTime;
    }
 
+   public void runOnPool(Runnable runnable) {
+      poolExecutor.execute(runnable);
+   }
+
+   public void runNow(Runnable runnable) {
+      if (workerExecutor.inEventLoop()) {
+         runnable.run();
+      } else {
+         workerExecutor.execute(runnable);
+      }
+   }
+
+   public void runLater(Runnable runnable) {
+      workerExecutor.execute(runnable);
+   }
+
    public void flush() {
-      lock.lock();
-      try {
+      if (workerExecutor.inEventLoop()) {
          transport.process();
-      } finally {
-         lock.unlock();
+         dispatch();
+      } else {
+         runLater(() -> {
+            transport.process();
+            dispatch();
+         });
       }
-
-      dispatch();
    }
 
-   public void close(ErrorCondition errorCondition) {
-      lock.lock();
-      try {
+   public void close(ErrorCondition errorCondition, AMQPConnectionContext connectionContext) {
+      runNow(() -> {
          if (errorCondition != null) {
             connection.setCondition(errorCondition);
          }
          connection.close();
-      } finally {
-         lock.unlock();
-      }
+         flush();
+      });
 
-      flush();
+      /*try {
+         Thread.sleep(1000);
+      } catch (Exception e) {
+         e.printStackTrace();
+      } */
+      // this needs to be done in two steps
+      // we first flush what we have to the client
+      // after flushed, we close the local connection
+      // otherwise this could close the netty connection before the Writable is complete
+      runLater(() -> {
+         connectionContext.getConnectionCallback().getTransportConnection().close();
+      });
    }
 
    // server side SASL Listener
@@ -462,45 +473,59 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
    private void dispatch() {
       Event ev;
 
-      lock.lock();
+      if (inDispatch) {
+         // Avoid recursion from events
+         return;
+      }
       try {
-         if (inDispatch) {
-            // Avoid recursion from events
-            return;
-         }
-         try {
-            inDispatch = true;
-            while ((ev = collector.peek()) != null) {
-               for (EventHandler h : handlers) {
-                  if (log.isTraceEnabled()) {
-                     log.trace("Handling " + ev + " towards " + h);
-                  }
-                  try {
-                     Events.dispatch(ev, h);
-                  } catch (Exception e) {
-                     log.warn(e.getMessage(), e);
-                     ErrorCondition error = new ErrorCondition();
-                     error.setCondition(AmqpError.INTERNAL_ERROR);
-                     error.setDescription("Unrecoverable error: " +
-                        (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
-                     connection.setCondition(error);
-                     connection.close();
-                  }
+         inDispatch = true;
+         while ((ev = collector.peek()) != null) {
+            for (EventHandler h : handlers) {
+               if (log.isTraceEnabled()) {
+                  log.trace("Handling " + ev + " towards " + h);
+               }
+               try {
+                  Events.dispatch(ev, h);
+               } catch (Exception e) {
+                  log.warn(e.getMessage(), e);
+                  ErrorCondition error = new ErrorCondition();
+                  error.setCondition(AmqpError.INTERNAL_ERROR);
+                  error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
+                  connection.setCondition(error);
+                  connection.close();
                }
-
-               collector.pop();
             }
 
-         } finally {
-            inDispatch = false;
+            collector.pop();
          }
+
       } finally {
-         lock.unlock();
+         inDispatch = false;
       }
 
       flushBytes();
    }
 
+
+   public void handleError(Exception e) {
+      if (workerExecutor.inEventLoop()) {
+         internalHandlerError(e);
+      } else {
+         runLater(() -> internalHandlerError(e));
+      }
+   }
+
+   private void internalHandlerError(Exception e) {
+      log.warn(e.getMessage(), e);
+      ErrorCondition error = new ErrorCondition();
+      error.setCondition(AmqpError.INTERNAL_ERROR);
+      error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
+      connection.setCondition(error);
+      connection.close();
+      flush();
+   }
+
+
    public void open(String containerId, Map<Symbol, Object> connectionProperties) {
       this.transport.open();
       this.connection.setContainer(containerId);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 78a5b33..15803f4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -107,14 +107,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             IOCallback ioAction = new IOCallback() {
                @Override
                public void done() {
-                  connection.lock();
-                  try {
+                  connection.runLater(() -> {
                      delivery.settle();
                      delivery.disposition(declared);
-                  } finally {
-                     connection.unlock();
                      connection.flush();
-                  }
+                  });
                }
 
                @Override
@@ -133,15 +130,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             IOCallback ioAction = new IOCallback() {
                @Override
                public void done() {
-                  connection.lock();
-                  try {
+                  connection.runLater(() -> {
                      delivery.settle();
                      delivery.disposition(new Accepted());
                      currentTx = null;
-                  } finally {
-                     connection.unlock();
                      connection.flush();
-                  }
+                  });
                }
 
                @Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index ab4ff42..4c5a887 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -25,11 +25,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.qpid.proton.engine.Delivery;
 
-
 /**
  * AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled
  * or not.  This class extends the Core TransactionImpl used for normal TX behaviour.  In the case where deliveries
@@ -46,8 +48,22 @@ public class ProtonTransactionImpl extends TransactionImpl {
 
    private boolean discharged;
 
-   public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
+   public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
       super(xid, storageManager, timeoutSeconds);
+      addOperation(new TransactionOperationAbstract() {
+         @Override
+         public void afterCommit(Transaction tx) {
+            super.afterCommit(tx);
+            connection.runNow(() -> {
+               // Settle all unsettled deliveries if commit is successful
+               for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
+                  if (!p.getA().isSettled())
+                     p.getB().settle(p.getA());
+               }
+               connection.flush();
+            });
+         }
+      });
    }
 
    @Override
@@ -71,11 +87,6 @@ public class ProtonTransactionImpl extends TransactionImpl {
    @Override
    public void commit() throws Exception {
       super.commit();
-
-      // Settle all unsettled deliveries if commit is successful
-      for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
-         if (!p.getA().isSettled()) p.getB().settle(p.getA());
-      }
    }
 
    public boolean isDischarged() {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
index 30814a9..349c32d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -16,13 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.never;
-
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -34,29 +27,65 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
+
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
 
 public class AMQPSessionCallbackTest {
 
-   @Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+   @Rule
+   public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+   @Mock
+   private AMQPConnectionCallback protonSPI;
+   @Mock
+   private ProtonProtocolManager manager;
+   @Mock
+   private AMQPConnectionContext connection;
+   @Mock
+   private Connection transportConnection;
+   @Mock
+   private Executor executor;
+   @Mock
+   private OperationContext operationContext;
+   @Mock
+   private Receiver receiver;
+   @Mock
+   private ActiveMQServer server;
+   @Mock
+   private PagingManager pagingManager;
+   @Mock
+   private PagingStore pagingStore;
+
+
+   @Before
+   public void setRule() {
+
+      // The connection will call the runnable now on this mock, as these would happen on a different thread.
+      Mockito.doAnswer(new Answer() {
+         @Override
+         public Void answer(InvocationOnMock invocation) throws Throwable {
+            ((Runnable) invocation.getArguments()[0]).run();
+            return null;
+         }
+      }).when(connection).runNow(Mockito.isA(Runnable.class));
 
-   @Mock private AMQPConnectionCallback protonSPI;
-   @Mock private ProtonProtocolManager manager;
-   @Mock private AMQPConnectionContext connection;
-   @Mock private Connection transportConnection;
-   @Mock private Executor executor;
-   @Mock private OperationContext operationContext;
-   @Mock private Receiver receiver;
-   @Mock private ActiveMQServer server;
-   @Mock private PagingManager pagingManager;
-   @Mock private PagingStore pagingStore;
+   }
 
    /**
     * Test that the AMQPSessionCallback grants no credit when not at threshold
@@ -69,8 +98,7 @@ public class AMQPSessionCallbackTest {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is above threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
@@ -100,8 +128,7 @@ public class AMQPSessionCallbackTest {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@@ -132,8 +159,7 @@ public class AMQPSessionCallbackTest {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is above threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
@@ -164,8 +190,7 @@ public class AMQPSessionCallbackTest {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@@ -195,8 +220,7 @@ public class AMQPSessionCallbackTest {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@@ -227,8 +251,7 @@ public class AMQPSessionCallbackTest {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 2402d09..e05a9af 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
@@ -31,7 +32,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 import org.jboss.logging.Logger;
 
-public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference {
+public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference, Runnable {
 
    private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class);
 
@@ -74,6 +75,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    private long messageSize = -1;
 
+   private MessageReferenceCallback callback;
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -90,6 +93,23 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
    }
 
    @Override
+   public void setCallback(MessageReferenceCallback callback) {
+      this.callback = callback;
+   }
+
+   @Override
+   public void run() {
+      MessageReferenceCallback callback = this.callback;
+
+      try {
+         if (callback != null) {
+            callback.executeDelivery(this);
+         }
+      } finally {
+         this.callback = null;
+      }
+   }
+   @Override
    public synchronized PagedMessage getPagedMessage() {
       PagedMessage returnMessage = message != null ? message.get() : null;
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
index f1e83d2..bd6b705 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
@@ -26,6 +26,10 @@ import org.apache.activemq.artemis.core.server.group.UnproposalListener;
 
 public interface Binding extends UnproposalListener {
 
+   default boolean isLocal() {
+      return false;
+   }
+
    SimpleString getAddress();
 
    Bindable getBindable();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
index 30a2680..053acfa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
@@ -26,6 +26,9 @@ import org.apache.activemq.artemis.core.server.group.UnproposalListener;
 
 public interface Bindings extends UnproposalListener {
 
+   // this is to inform the parent there was an udpate on the bindings
+   void updated(QueueBinding binding);
+
    Collection<Binding> getBindings();
 
    void addBinding(Binding binding);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 56abddb..9cd6f73 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -26,12 +26,14 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -63,6 +65,13 @@ public final class BindingsImpl implements Bindings {
 
    private final SimpleString name;
 
+   private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE);
+
+   /**
+    * This has a version about adds and removes
+    */
+   private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet());
+
    public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
       this.groupingHandler = groupingHandler;
       this.name = name;
@@ -92,61 +101,78 @@ public final class BindingsImpl implements Bindings {
 
    @Override
    public void addBinding(final Binding binding) {
-      if (logger.isTraceEnabled()) {
-         logger.trace("addBinding(" + binding + ") being called");
-      }
-      if (binding.isExclusive()) {
-         exclusiveBindings.add(binding);
-      } else {
-         SimpleString routingName = binding.getRoutingName();
+      try {
+         if (logger.isTraceEnabled()) {
+            logger.trace("addBinding(" + binding + ") being called");
+         }
+         if (binding.isExclusive()) {
+            exclusiveBindings.add(binding);
+         } else {
+            SimpleString routingName = binding.getRoutingName();
 
-         List<Binding> bindings = routingNameBindingMap.get(routingName);
+            List<Binding> bindings = routingNameBindingMap.get(routingName);
 
-         if (bindings == null) {
-            bindings = new CopyOnWriteArrayList<>();
+            if (bindings == null) {
+               bindings = new CopyOnWriteArrayList<>();
 
-            List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
+               List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
+
+               if (oldBindings != null) {
+                  bindings = oldBindings;
+               }
+            }
 
-            if (oldBindings != null) {
-               bindings = oldBindings;
+            if (!bindings.contains(binding)) {
+               bindings.add(binding);
             }
          }
 
-         if (!bindings.contains(binding)) {
-            bindings.add(binding);
+         bindingsMap.put(binding.getID(), binding);
+
+         if (logger.isTraceEnabled()) {
+            logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
          }
+      } finally {
+         updated();
       }
 
-      bindingsMap.put(binding.getID(), binding);
+   }
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
-      }
+   @Override
+   public void updated(QueueBinding binding) {
+      updated();
+   }
 
+   private void updated() {
+      version.set(sequenceVersion.incrementAndGet());
    }
 
    @Override
    public void removeBinding(final Binding binding) {
-      if (binding.isExclusive()) {
-         exclusiveBindings.remove(binding);
-      } else {
-         SimpleString routingName = binding.getRoutingName();
+      try {
+         if (binding.isExclusive()) {
+            exclusiveBindings.remove(binding);
+         } else {
+            SimpleString routingName = binding.getRoutingName();
 
-         List<Binding> bindings = routingNameBindingMap.get(routingName);
+            List<Binding> bindings = routingNameBindingMap.get(routingName);
 
-         if (bindings != null) {
-            bindings.remove(binding);
+            if (bindings != null) {
+               bindings.remove(binding);
 
-            if (bindings.isEmpty()) {
-               routingNameBindingMap.remove(routingName);
+               if (bindings.isEmpty()) {
+                  routingNameBindingMap.remove(routingName);
+               }
             }
          }
-      }
 
-      bindingsMap.remove(binding.getID());
+         bindingsMap.remove(binding.getID());
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
+         if (logger.isTraceEnabled()) {
+            logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
+         }
+      } finally {
+         updated();
       }
    }
 
@@ -267,11 +293,9 @@ public final class BindingsImpl implements Bindings {
 
          if (binding.getFilter() == null || binding.getFilter().match(message)) {
             binding.getBindable().route(message, context);
-
             routed = true;
          }
       }
-
       if (!routed) {
          // Remove the ids now, in order to avoid double check
          ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
@@ -280,30 +304,53 @@ public final class BindingsImpl implements Bindings {
          SimpleString groupId = message.getGroupID();
 
          if (ids != null) {
+            context.clear();
             routeFromCluster(message, context, ids);
          } else if (groupingHandler != null && groupRouting && groupId != null) {
+            context.clear();
             routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
          } else {
-            if (logger.isTraceEnabled()) {
-               logger.trace("Routing message " + message + " on binding=" + this);
+            // in a optimization, we are reusing the previous context if everything is right for it
+            // so the simpleRouting will only happen if neededk
+            if (!context.isReusable(message, version.get())) {
+               context.clear();
+               simpleRouting(message, context);
             }
-            for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-               SimpleString routingName = entry.getKey();
+         }
+      }
+   }
 
-               List<Binding> bindings = entry.getValue();
+   private void simpleRouting(Message message, RoutingContext context) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Routing message " + message + " on binding=" + this);
+      }
 
-               if (bindings == null) {
-                  // The value can become null if it's concurrently removed while we're iterating - this is expected
-                  // ConcurrentHashMap behaviour!
-                  continue;
-               }
+      // We check at the version before we started routing,
+      // this is because if something changed in between we want to check the correct version
+      int currentVersion = version.get();
 
-               Binding theBinding = getNextBinding(message, routingName, bindings);
+      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
+         SimpleString routingName = entry.getKey();
 
-               if (theBinding != null) {
-                  theBinding.route(message, context);
-               }
-            }
+         List<Binding> bindings = entry.getValue();
+
+         if (bindings == null) {
+            // The value can become null if it's concurrently removed while we're iterating - this is expected
+            // ConcurrentHashMap behaviour!
+            continue;
+         }
+
+         Binding theBinding = getNextBinding(message, routingName, bindings);
+
+         if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
+            context.setReusable(true, currentVersion);
+         } else {
+            // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
+            context.setReusable(false, currentVersion);
+         }
+
+         if (theBinding != null) {
+            theBinding.route(message, context);
          }
       }
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 79af5d0..79ab4d3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -47,6 +47,11 @@ public class LocalQueueBinding implements QueueBinding {
    }
 
    @Override
+   public boolean isLocal() {
+      return true;
+   }
+
+   @Override
    public long getID() {
       return queue.getID();
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 5346d6c..bf12baf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -485,82 +485,91 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             return null;
          }
 
-         final Queue queue = queueBinding.getQueue();
+         Bindings bindingsOnQueue = addressManager.getBindingsForRoutingAddress(queueBinding.getAddress());
 
-         boolean changed = false;
+         try {
 
-         //validate update
-         if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
-            final int consumerCount = queue.getConsumerCount();
-            if (consumerCount > maxConsumers) {
-               throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
+            final Queue queue = queueBinding.getQueue();
+
+            boolean changed = false;
+
+            //validate update
+            if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
+               final int consumerCount = queue.getConsumerCount();
+               if (consumerCount > maxConsumers) {
+                  throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
+               }
             }
-         }
-         if (routingType != null) {
-            final SimpleString address = queue.getAddress();
-            final AddressInfo addressInfo = addressManager.getAddressInfo(address);
-            final EnumSet<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
-            if (!addressRoutingTypes.contains(routingType)) {
-               throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
+            if (routingType != null) {
+               final SimpleString address = queue.getAddress();
+               final AddressInfo addressInfo = addressManager.getAddressInfo(address);
+               final EnumSet<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
+               if (!addressRoutingTypes.contains(routingType)) {
+                  throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
+               }
             }
-         }
 
-         //atomic update
-         if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
-            changed = true;
-            queue.setMaxConsumer(maxConsumers);
-         }
-         if (routingType != null && queue.getRoutingType() != routingType) {
-            changed = true;
-            queue.setRoutingType(routingType);
-         }
-         if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
-            changed = true;
-            queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
-         }
-         if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) {
-            changed = true;
-            queue.setExclusive(exclusive);
-         }
-         if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
-            changed = true;
-            queue.setNonDestructive(nonDestructive);
-         }
-         if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
-            changed = true;
-            queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
-         }
-         if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) {
-            changed = true;
-            queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
-         }
-         if (filter != null && !filter.equals(queue.getFilter())) {
-            changed = true;
-            queue.setFilter(filter);
-         }
-         if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
-            changed = true;
-            queue.setConfigurationManaged(configurationManaged);
-         }
-         if (logger.isDebugEnabled()) {
-            if (user == null && queue.getUser() != null) {
-               logger.debug("Ignoring updating Queue to a NULL user");
+            //atomic update
+            if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
+               changed = true;
+               queue.setMaxConsumer(maxConsumers);
+            }
+            if (routingType != null && queue.getRoutingType() != routingType) {
+               changed = true;
+               queue.setRoutingType(routingType);
+            }
+            if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
+               changed = true;
+               queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
+            }
+            if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) {
+               changed = true;
+               queue.setExclusive(exclusive);
+            }
+            if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
+               changed = true;
+               queue.setNonDestructive(nonDestructive);
+            }
+            if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
+               changed = true;
+               queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
+            }
+            if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) {
+               changed = true;
+               queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
+            }
+            if (filter != null && !filter.equals(queue.getFilter())) {
+               changed = true;
+               queue.setFilter(filter);
+            }
+            if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
+               changed = true;
+               queue.setConfigurationManaged(configurationManaged);
+            }
+            if (logger.isDebugEnabled()) {
+               if (user == null && queue.getUser() != null) {
+                  logger.debug("Ignoring updating Queue to a NULL user");
+               }
+            }
+            if (user != null && !user.equals(queue.getUser())) {
+               changed = true;
+               queue.setUser(user);
             }
-         }
-         if (user != null && !user.equals(queue.getUser())) {
-            changed = true;
-            queue.setUser(user);
-         }
 
-         if (changed) {
-            final long txID = storageManager.generateID();
-            try {
-               storageManager.updateQueueBinding(txID, queueBinding);
-               storageManager.commitBindings(txID);
-            } catch (Throwable throwable) {
-               storageManager.rollback(txID);
-               logger.warn(throwable.getMessage(), throwable);
-               throw throwable;
+            if (changed) {
+               final long txID = storageManager.generateID();
+               try {
+                  storageManager.updateQueueBinding(txID, queueBinding);
+                  storageManager.commitBindings(txID);
+               } catch (Throwable throwable) {
+                  storageManager.rollback(txID);
+                  logger.warn(throwable.getMessage(), throwable);
+                  throw throwable;
+               }
+            }
+         } finally {
+            if (bindingsOnQueue != null) {
+               bindingsOnQueue.updated(queueBinding);
             }
          }
 
@@ -876,6 +885,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       AddressInfo addressInfo = addressManager.getAddressInfo(address);
 
       if (bindingMove != null) {
+         context.clear();
          bindingMove.route(message, context);
          if (addressInfo != null) {
             addressInfo.incrementRoutedMessageCount();
@@ -1341,7 +1351,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
             @Override
             public void done() {
-               addReferences(refs, direct);
+               context.processReferences(refs, direct);
             }
          });
       }
@@ -1476,16 +1486,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       return true;
    }
 
-   /**
-    * @param refs
-    */
-   private void addReferences(final List<MessageReference> refs, final boolean direct) {
-      for (MessageReference ref : refs) {
-         ref.getQueue().addTail(ref, direct);
-      }
-   }
-
-   /**
+  /**
     * The expiry scanner can't be started until the whole server has been started other wise you may get races
     */
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 09d67ad..87a3c30 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -263,7 +263,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
 
          Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>();
          for (Entry<String, ProtocolManagerFactory> entry : selectedProtocolFactories.entrySet()) {
-            selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
+            selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getCombinedParams(), incomingInterceptors, outgoingInterceptors));
          }
 
          acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 43bed88..4b5509e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1048,7 +1048,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222151, value = "removing consumer which did not handle a message, consumer={0}, message={1}",
       format = Message.Format.MESSAGE_FORMAT)
-   void removingBadConsumer(@Cause Throwable e, Consumer consumer, MessageReference reference);
+   void removingBadConsumer(@Cause Throwable e, Consumer consumer, Object reference);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222152, value = "Unable to decrement reference counting on queue",
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 6df4889..1dfff29 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -46,6 +46,10 @@ public interface Consumer {
     */
    HandleStatus handle(MessageReference reference) throws Exception;
 
+   /** wakes up internal threads to deliver more messages */
+   default void promptDelivery() {
+   }
+
    /**
     * This will proceed with the actual delivery.
     * Notice that handle should hold a readLock and proceedDelivery should release the readLock
@@ -80,4 +84,8 @@ public interface Consumer {
 
    /** an unique sequential ID for this consumer */
    long sequentialID();
+
+   default void errorProcessing(Throwable e, MessageReference reference) {
+
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 2e2fb8d..886af36 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -44,6 +44,8 @@ public interface MessageReference {
 
    SimpleString getLastValueProperty();
 
+   void setCallback(MessageReferenceCallback callback);
+
    /**
     * We define this method aggregation here because on paging we need to hold the original estimate,
     * so we need to perform some extra steps on paging.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java
new file mode 100644
index 0000000..4804dde
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in cases where a message delivery happens on an executor.
+ *  Most MessageReference implementations will allow execution, and if it does,
+ *  and the protocol requires an execution per message, this callback may be used.
+ *
+ *  At the time of this implementation only AMQP was used. */
+public interface MessageReferenceCallback {
+   void executeDelivery(MessageReference reference);
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 8a120ea..031d01a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -53,6 +53,10 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void setRoutingType(RoutingType routingType);
 
+   /** the current queue and consumer settings will allow use of the Reference Execution and callback.
+    *  This is because  */
+   boolean allowsReferenceCallback();
+
    boolean isDurable();
 
    /**
@@ -392,4 +396,8 @@ public interface Queue extends Bindable,CriticalComponent {
    /** This is to perform a check on the counter again */
    void recheckRefCount(OperationContext context);
 
+   default void errorProcessing(Consumer consumer, Throwable t, MessageReference messageReference) {
+
+   }
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 9b09256..151aa41 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -26,6 +26,24 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 
 public interface RoutingContext {
 
+   /*
+     This will return true if the RoutingContext can be reused
+     false if it cannot
+     null, if we don't know.
+
+
+     Once false, it can't be set to true
+   */
+   boolean isReusable();
+
+   int getPreviousBindingsVersion();
+
+   SimpleString getPreviousAddress();
+
+   void setReusable(boolean reusable);
+
+   RoutingContext setReusable(boolean reusable, int version);
+
    Transaction getTransaction();
 
    void setTransaction(Transaction transaction);
@@ -54,5 +72,16 @@ public interface RoutingContext {
 
    SimpleString getAddress(Message message);
 
+   SimpleString getAddress();
+
    RoutingType getRoutingType();
+
+   RoutingType getPreviousRoutingType();
+
+   void processReferences(List<MessageReference> refs, boolean direct);
+
+   boolean isReusable(Message message, int version);
+
+
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index eb05563..4d35919 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -31,6 +31,10 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
 
    void fireSlowConsumer();
 
+   /** the current queue settings will allow use of the Reference Execution and callback.
+    *  This is because  */
+   boolean allowReferenceCallback();
+
    /**
     * this is to be used with anything specific on a protocol head.
     */
@@ -105,6 +109,4 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
    long getCreationTime();
 
    String getSessionID();
-
-   void promptDelivery();
 }
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 37442b2..cfc3e01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -22,6 +22,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.core.Message;
@@ -43,6 +44,8 @@ public interface ServerSession extends SecurityAuth {
 
    Object getConnectionID();
 
+   Executor getSessionExecutor();
+
    /**
     * Certain protocols may create an internal session that shouldn't go through security checks.
     * make sure you don't expose this property through any protocol layer as that would be a security breach
@@ -241,12 +244,26 @@ public interface ServerSession extends SecurityAuth {
                       boolean direct,
                       boolean noAutoCreateQueue) throws Exception;
 
+   RoutingStatus send(Transaction tx,
+                      Message message,
+                      boolean direct,
+                      boolean noAutoCreateQueue,
+                      RoutingContext routingContext) throws Exception;
+
+
    RoutingStatus doSend(Transaction tx,
                         Message msg,
                         SimpleString originalAddress,
                         boolean direct,
                         boolean noAutoCreateQueue) throws Exception;
 
+   RoutingStatus doSend(Transaction tx,
+                        Message msg,
+                        SimpleString originalAddress,
+                        boolean direct,
+                        boolean noAutoCreateQueue,
+                        RoutingContext routingContext) throws Exception;
+
    RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
 
    RoutingStatus send(Message message, boolean direct) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 315b926..2fd70b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -146,6 +147,11 @@ public class LastValueQueue extends QueueImpl {
       }
    }
 
+   @Override
+   public boolean allowsReferenceCallback() {
+      return false;
+   }
+
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
@@ -231,6 +237,11 @@ public class LastValueQueue extends QueueImpl {
          this.ref = ref;
       }
 
+      @Override
+      public void setCallback(MessageReferenceCallback callback) {
+         // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
+      }
+
       MessageReference getReference() {
          return ref;
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 2401c4a..6d32c46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -30,7 +31,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 /**
  * Implementation of a MessageReference
  */
-public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
+public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
 
    private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
       .newUpdater(MessageReferenceImpl.class, "deliveryCount");
@@ -54,6 +55,8 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    private Object protocolData;
 
+   private MessageReferenceCallback callback;
+
    // Static --------------------------------------------------------
 
    private static final int memoryOffset = 64;
@@ -85,6 +88,24 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    // MessageReference implementation -------------------------------
 
    @Override
+   public void setCallback(MessageReferenceCallback callback) {
+      this.callback = callback;
+   }
+
+   @Override
+   public void run() {
+      MessageReferenceCallback callback = this.callback;
+
+      try {
+         if (callback != null) {
+            callback.executeDelivery(this);
+         }
+      } finally {
+         this.callback = null;
+      }
+   }
+
+   @Override
    public Object getProtocolData() {
       return protocolData;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 292cfc1..1752eb2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -535,6 +535,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    // Bindable implementation -------------------------------------------------------------------------------------
 
+   @Override
+   public boolean allowsReferenceCallback() {
+      // non descructive queues will reuse the same reference between multiple consumers
+      // so you cannot really use the callback from the MessageReference
+      return !nonDestructive;
+   }
+
    public SimpleString getRoutingName() {
       return name;
    }
@@ -627,8 +634,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public void route(final Message message, final RoutingContext context) throws Exception {
-      if (purgeOnNoConsumers && getConsumerCount() == 0) {
-         return;
+      if (purgeOnNoConsumers) {
+         context.setReusable(false);
+         if (getConsumerCount() == 0) {
+            return;
+         }
       }
       context.addQueue(address, this);
    }
@@ -849,11 +859,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   // Go into direct delivery mode
                   directDeliver = supportsDirectDeliver;
                   if (logger.isTraceEnabled()) {
-                     logger.trace("Setting direct deliverer to " + supportsDirectDeliver);
+                     logger.trace("Setting direct deliverer to " + supportsDirectDeliver + " on queue " + this.getName());
                   }
                } else {
                   if (logger.isTraceEnabled()) {
-                     logger.trace("Couldn't set direct deliver back");
+                     logger.trace("Couldn't set direct deliver back on queue " + this.getName());
                   }
                }
             }
@@ -1414,6 +1424,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @Override
    public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
       if (nonDestructive && reason == AckReason.NORMAL) {
+         decDelivering(ref);
          if (logger.isDebugEnabled()) {
             logger.debug("acknowledge ignored nonDestructive=true and reason=NORMAL");
          }
@@ -3141,6 +3152,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                break;
             }
          }
+
+         if (logger.isTraceEnabled()) {
+            logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
+         }
          return false;
       }
    }
@@ -3160,24 +3175,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       try {
          consumer.proceedDeliver(reference);
       } catch (Throwable t) {
-         ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
-
-         synchronized (this) {
-            // If the consumer throws an exception we remove the consumer
-            try {
-               removeConsumer(consumer);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
-            }
-
-            // The message failed to be delivered, hence we try again
-            addHead(reference, false);
-         }
+         errorProcessing(consumer, t, reference);
       } finally {
          deliveriesInTransit.countDown();
       }
    }
 
+   /** This will print errors and decide what to do with the errored consumer from the protocol layer. */
+   @Override
+   public void errorProcessing(Consumer consumer, Throwable t, MessageReference reference) {
+      synchronized (this) {
+         ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
+         // If the consumer throws an exception we remove the consumer
+         try {
+            removeConsumer(consumer);
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
+         }
+
+         // The message failed to be delivered, hence we try again
+         addHead(reference, false);
+      }
+   }
+
    private boolean checkExpired(final MessageReference reference) {
       try {
          if (reference.getMessage().isExpired()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 29a70e4..b5b36bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -20,9 +20,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -40,19 +42,68 @@ public final class RoutingContextImpl implements RoutingContext {
 
    private SimpleString address;
 
+   private SimpleString previousAddress;
+
+   private RoutingType previousRoutingType;
+
    private RoutingType routingType;
 
+   Boolean reusable = null;
+
+   volatile int version;
+
+   private final Executor executor;
+
    public RoutingContextImpl(final Transaction transaction) {
+      this(transaction, null);
+   }
+
+   public RoutingContextImpl(final Transaction transaction, Executor executor) {
       this.transaction = transaction;
+      this.executor = executor;
    }
 
    @Override
-   public void clear() {
-      transaction = null;
+   public boolean isReusable() {
+      return reusable != null && reusable;
+   }
+
+   @Override
+   public int getPreviousBindingsVersion() {
+      return version;
+   }
+
+   @Override
+   public SimpleString getPreviousAddress() {
+      return previousAddress;
+   }
+
+   @Override
+   public void setReusable(boolean reusable) {
+      this.reusable = reusable;
+   }
+   @Override
+   public RoutingContext setReusable(boolean reusable, int previousBindings) {
+      this.version = previousBindings;
+      this.previousAddress = address;
+      this.previousRoutingType = routingType;
+      if (this.reusable != null && !this.reusable.booleanValue()) {
+         // cannot set to Reusable once it was set to false
+         return this;
+      }
+      this.reusable = reusable;
+      return this;
+   }
 
+   @Override
+   public void clear() {
       map.clear();
 
       queueCount = 0;
+
+      this.version = 0;
+
+      this.reusable = null;
    }
 
    @Override
@@ -70,6 +121,18 @@ public final class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
+   public void processReferences(final List<MessageReference> refs, final boolean direct) {
+      internalprocessReferences(refs, direct);
+   }
+
+   private void internalprocessReferences(final List<MessageReference> refs, final boolean direct) {
+      for (MessageReference ref : refs) {
+         ref.getQueue().addTail(ref, direct);
+      }
+   }
+
+
+   @Override
    public void addQueueWithAck(SimpleString address, Queue queue) {
       addQueue(address, queue);
       RouteContextList listing = getContextListing(address);
@@ -83,6 +146,11 @@ public final class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
+   public boolean isReusable(Message message, int version) {
+      return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType && getPreviousBindingsVersion() == version;
+   }
+
+   @Override
    public void setAddress(SimpleString address) {
       this.address = address;
    }
@@ -101,11 +169,21 @@ public final class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
+   public SimpleString getAddress() {
+      return address;
+   }
+
+   @Override
    public RoutingType getRoutingType() {
       return routingType;
    }
 
    @Override
+   public RoutingType getPreviousRoutingType() {
+      return previousRoutingType;
+   }
+
+   @Override
    public RouteContextList getContextListing(SimpleString address) {
       RouteContextList listing = map.get(address);
       if (listing == null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index f7a89d7..19e3956 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -257,6 +257,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
 
    @Override
+   public boolean allowReferenceCallback() {
+      if (browseOnly) {
+         return false;
+      } else {
+         return messageQueue.allowsReferenceCallback();
+      }
+   }
+
+   @Override
    public long sequentialID() {
       return sequentialID;
    }
@@ -346,6 +355,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       return callback.supportsDirectDelivery();
    }
 
+   @Override
+   public void errorProcessing(Throwable e, MessageReference deliveryObject) {
+      messageQueue.errorProcessing(this, e, deliveryObject);
+   }
 
    @Override
    public HandleStatus handle(final MessageReference ref) throws Exception {
@@ -582,13 +595,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    public void forceDelivery(final long sequence)  {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
+         MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
+         reference.setDeliveryCount(0);
 
          forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
          forcedDeliveryMessage.setAddress(messageQueue.getName());
 
          applyPrefixForLegacyConsumer(forcedDeliveryMessage);
-         callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
-
+         callback.sendMessage(reference, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
       });
    }
 
@@ -949,7 +963,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       } catch (ActiveMQException e) {
          if (startedTransaction) {
             tx.rollback();
-         } else {
+         } else if (tx != null) {
             tx.markAsRollbackOnly(e);
          }
          throw e;
@@ -958,7 +972,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
          if (startedTransaction) {
             tx.rollback();
-         } else {
+         } else if (tx != null) {
             tx.markAsRollbackOnly(hqex);
          }
          throw hqex;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 3bc60f2..11b096b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.Closeable;
@@ -190,6 +191,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    private Set<Closeable> closeables;
 
+   private final Executor sessionExecutor;
+
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -264,6 +267,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       remotingConnection.addFailureListener(this);
       this.context = context;
 
+      this.sessionExecutor = server.getExecutorFactory().getExecutor();
+
       if (!xa) {
          tx = newTransaction();
       }
@@ -284,6 +289,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public Executor getSessionExecutor() {
+      return sessionExecutor;
+   }
+
+   @Override
    public void disableSecurity() {
       this.securityEnabled = false;
    }
@@ -1467,12 +1477,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       return lsm;
    }
 
-
    @Override
    public synchronized RoutingStatus send(Transaction tx,
                                           Message msg,
                                           final boolean direct,
                                           boolean noAutoCreateQueue) throws Exception {
+      return send(tx, msg, direct, noAutoCreateQueue, routingContext);
+   }
+
+   @Override
+   public synchronized RoutingStatus send(Transaction tx,
+                                          Message msg,
+                                          final boolean direct,
+                                          boolean noAutoCreateQueue,
+                                          RoutingContext routingContext) throws Exception {
 
       final Message message;
       if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) {
@@ -1527,7 +1545,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
             result = handleManagementMessage(tx, message, direct);
          } else {
-            result = doSend(tx, message, address, direct, noAutoCreateQueue);
+            result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext);
          }
 
       } catch (Exception e) {
@@ -1766,7 +1784,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          }
          reply.setAddress(replyTo);
 
-         doSend(tx, reply, null, direct, false);
+         doSend(tx, reply, null, direct, false, routingContext);
       }
 
       return RoutingStatus.OK;
@@ -1823,12 +1841,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       theTx.rollback();
    }
 
+
    @Override
    public synchronized RoutingStatus doSend(final Transaction tx,
                                             final Message msg,
                                             final SimpleString originalAddress,
                                             final boolean direct,
                                             final boolean noAutoCreateQueue) throws Exception {
+      return doSend(tx, msg, originalAddress, direct, noAutoCreateQueue, routingContext);
+   }
+
+
+   @Override
+   public synchronized RoutingStatus doSend(final Transaction tx,
+                                            final Message msg,
+                                            final SimpleString originalAddress,
+                                            final boolean direct,
+                                            final boolean noAutoCreateQueue,
+                                            final RoutingContext routingContext) throws Exception {
 
       RoutingStatus result = RoutingStatus.OK;
 
@@ -1861,6 +1891,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
 
       if (tx == null || autoCommitSends) {
+         routingContext.setTransaction(null);
       } else {
          routingContext.setTransaction(tx);
       }
@@ -1880,7 +1911,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             value.getB().incrementAndGet();
          }
       } finally {
-         routingContext.clear();
+         if (!routingContext.isReusable()) {
+            routingContext.clear();
+         }
       }
       return result;
    }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 0ef7804..2e3b691 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -794,6 +794,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public boolean allowsReferenceCallback() {
+         return false;
+      }
+
+      @Override
       public int getConsumersBeforeDispatch() {
          return 0;
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 05e763f..550adbe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -250,7 +250,7 @@ public class AddressingTest extends ActiveMQTestBase {
 
       // there are no consumers so no messages should be routed to the queue
       producer.send(session.createMessage(true));
-      assertEquals(0, queue.getMessageCount());
+      Wait.assertEquals(0, queue::getMessageCount);
    }
 
    @Test
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index 4529efb..cfcbc20 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -138,7 +138,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
       sender.send(message);
       sender.close();
 
-      assertEquals(1, queueView.getMessageCount());
+      Wait.assertEquals(1, queueView::getMessageCount);
 
       // Now try and get the message
       AmqpReceiver receiver = session.createReceiver(getQueueName());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
index 2f65dfb..c6119a1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -74,7 +75,8 @@ public class AmqpFlowControlFailTest extends JMSClientTestSupport {
          }
          receiver.close();
          session2.close();
-         assertEquals(1000, sender.getSender().getCredit());
+
+         Wait.assertEquals(1000, sender.getSender()::getCredit);
          for (int i = 0; i < 1000; i++) {
             final AmqpMessage message = new AmqpMessage();
             byte[] payload = new byte[100];
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index ea62df8..9dc1138 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -47,6 +47,7 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Sender;
 import org.jgroups.util.UUID;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1154,4 +1155,52 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       receiver.close();
       connection.close();
    }
+
+
+
+   @Test(timeout = 60000)
+   public void testReceiveRejecting() throws Exception {
+      final int MSG_COUNT = 1000;
+
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      final String address = getQueueName();
+
+
+      AmqpSender sender = session.createSender(address);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setMessageId("msg" + i);
+         sender.send(message);
+      }
+
+
+
+      Queue queueView = getProxyToQueue(address);
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         final AmqpReceiver receiver = session.createReceiver(address);
+
+         receiver.flow(MSG_COUNT);
+         AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(received);
+         Assert.assertEquals("msg" + i, received.getMessageId());
+         received.accept();
+         receiver.close();
+      }
+      final AmqpReceiver receiver = session.createReceiver(address);
+      receiver.flow(MSG_COUNT);
+
+      Assert.assertNull(receiver.receive(1, TimeUnit.MILLISECONDS));
+
+
+      Wait.assertEquals(0, queueView::getDeliveringCount);
+
+      connection.close();
+   }
+
+
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index 96d5f1c..e35635d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -766,7 +766,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
          // We should have now drained the Queue
          receiver.flow(1);
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         AmqpMessage message = receiver.receive(1, TimeUnit.SECONDS);
          if (message != null) {
             System.out.println("Read message: " + message.getApplicationProperty("msgId"));
          }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 50ab389..7b1f155 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -324,7 +324,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
          Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue consumerQueue = consumerSession.createQueue(queueName);
          MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
-         TextMessage msg = (TextMessage) consumer.receive(200);
+         TextMessage msg = (TextMessage) consumer.receive(2000);
          assertNotNull(msg);
          consumer.close();
       }
@@ -336,7 +336,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
          Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue consumerQueue = consumerSession.createQueue(queueName);
          MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
-         TextMessage msg = (TextMessage) consumer.receive(200);
+         TextMessage msg = (TextMessage) consumer.receive(2000);
          assertNull(msg);
          consumer.close();
       }
@@ -349,8 +349,8 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
          MessageConsumer consumer = createConsumer(consumerConnection, queueName);
          MessageConsumer consumer2 = createConsumer(consumerConnection2, queueName);
 
-         TextMessage msg = (TextMessage) consumer.receive(200);
-         TextMessage msg2 = (TextMessage) consumer2.receive(200);
+         TextMessage msg = (TextMessage) consumer.receive(2000);
+         TextMessage msg2 = (TextMessage) consumer2.receive(2000);
 
          assertNotNull(msg);
          assertNotNull(msg2);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 1b790a0..58bf2d3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -46,6 +46,11 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
+   public boolean allowReferenceCallback() {
+      return false;
+   }
+
+   @Override
    public Object getProtocolData() {
       return null;
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index f103d41..39200e6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -500,10 +500,16 @@ public class ConsumerTest extends ActiveMQTestBase {
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          javax.jms.Queue queue = session.createQueue(QUEUE.toString());
          MessageProducer producer = session.createProducer(queue);
-         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         if (durable) {
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         } else {
+
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         }
 
          long time = System.currentTimeMillis();
-         int NUMBER_OF_MESSAGES = 100;
+         int NUMBER_OF_MESSAGES = durable ? 500 : 5000;
          for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
             TextMessage msg = session.createTextMessage("hello " + i);
             msg.setIntProperty("mycount", i);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index dc57a12..3e64ac5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -242,6 +242,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
          }
 
          @Override
+         public boolean allowsReferenceCallback() {
+            return false;
+         }
+
+         @Override
          public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter) throws Exception {
             latchDelete.countDown();
             blocked.acquire();
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index bea1167..15ac691 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -84,6 +84,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public boolean allowsReferenceCallback() {
+      return false;
+   }
+
+   @Override
    public boolean isExclusive() {
       // no-op
       return false;
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
index 40fadf9..0140263 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -335,6 +336,10 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void updated(QueueBinding binding) {
+      }
+
+      @Override
       public boolean redistribute(Message message,
                                   Queue originatingQueue,
                                   RoutingContext context) throws Exception {

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 03/04: ARTEMIS-2205 Optimizing some Lambda usages

nigrofranz
In reply to this post by nigrofranz
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 8281e3b58f3f64abe2c7321aeb2c9de07d5160ab
Author: Francesco Nigro <[hidden email]>
AuthorDate: Mon Dec 17 09:12:19 2018 -0500

    ARTEMIS-2205 Optimizing some Lambda usages
   
    https://issues.apache.org/jira/browse/ARTEMIS-2205
---
 .../amqp/proton/ProtonServerSenderContext.java     | 16 +++++++------
 .../core/paging/cursor/PagedReferenceImpl.java     | 26 ++++++++++++---------
 .../artemis/core/server/MessageReference.java      | 11 ++++++++-
 .../core/server/MessageReferenceCallback.java      | 27 ----------------------
 .../artemis/core/server/impl/LastValueQueue.java   |  5 ++--
 .../core/server/impl/MessageReferenceImpl.java     | 25 +++++++++++---------
 6 files changed, 51 insertions(+), 59 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 843d1fe..4caf2d0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
@@ -53,6 +52,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -79,7 +79,7 @@ import org.jboss.logging.Logger;
 /**
  * This is the Equivalent for the ServerConsumer
  */
-public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback {
+public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
 
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
 
@@ -92,7 +92,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();
 
    private Consumer brokerConsumer;
-
+   private ReadyListener onflowControlReady;
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
    protected final AMQPConnectionContext connection;
@@ -117,6 +117,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
     * to sync the credits we have versus the credits that are being held in proton
     * */
    private final Object creditsLock = new Object();
+   private final java.util.function.Consumer<? super MessageReference> executeDelivery;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection,
                                     Sender sender,
@@ -127,6 +128,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       this.sender = sender;
       this.protonSession = protonSession;
       this.sessionSPI = server;
+      this.executeDelivery = this::executeDelivery;
    }
 
    public Object getBrokerConsumer() {
@@ -164,7 +166,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    }
 
    public boolean hasCredits() {
-      if (!connection.flowControl(brokerConsumer::promptDelivery)) {
+      if (!connection.flowControl(onflowControlReady)) {
          return false;
       }
 
@@ -488,6 +490,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
          brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+         onflowControlReady = brokerConsumer::promptDelivery;
       } catch (ActiveMQAMQPResourceLimitExceededException e1) {
          throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (ActiveMQSecurityException e) {
@@ -747,7 +750,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
 
          if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
-            messageReference.setCallback(this);
+            messageReference.onDelivery(executeDelivery);
             connection.runNow((Runnable)messageReference);
          } else {
             connection.runNow(() -> executeDelivery(messageReference));
@@ -760,8 +763,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
-   @Override
-   public void executeDelivery(MessageReference messageReference) {
+   private void executeDelivery(MessageReference messageReference) {
 
       try {
          if (sender.getLocalState() == EndpointState.CLOSED) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index e05a9af..893e3a7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -18,13 +18,13 @@ package org.apache.activemq.artemis.core.paging.cursor;
 
 import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
@@ -75,7 +75,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    private long messageSize = -1;
 
-   private MessageReferenceCallback callback;
+   private Consumer<? super MessageReference> onDelivery;
 
    @Override
    public Object getProtocolData() {
@@ -93,22 +93,26 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
    }
 
    @Override
-   public void setCallback(MessageReferenceCallback callback) {
-      this.callback = callback;
+   public void onDelivery(Consumer<? super MessageReference> onDelivery) {
+      assert this.onDelivery == null;
+      this.onDelivery = onDelivery;
    }
 
+   /**
+    * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
+    */
    @Override
    public void run() {
-      MessageReferenceCallback callback = this.callback;
-
-      try {
-         if (callback != null) {
-            callback.executeDelivery(this);
+      final Consumer<? super MessageReference> onDelivery = this.onDelivery;
+      if (onDelivery != null) {
+         try {
+            onDelivery.accept(this);
+         } finally {
+            this.onDelivery = null;
          }
-      } finally {
-         this.callback = null;
       }
    }
+
    @Override
    public synchronized PagedMessage getPagedMessage() {
       PagedMessage returnMessage = message != null ? message.get() : null;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 886af36..905f93d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.core.server;
 
 
+import java.util.function.Consumer;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -44,7 +46,14 @@ public interface MessageReference {
 
    SimpleString getLastValueProperty();
 
-   void setCallback(MessageReferenceCallback callback);
+   /**
+    * This is to be used in cases where a message delivery happens on an executor.
+    * Most MessageReference implementations will allow execution, and if it does,
+    * and the protocol requires an execution per message, this callback may be used.
+    *
+    * At the time of this implementation only AMQP was used.
+    */
+   void onDelivery(Consumer<? super MessageReference> callback);
 
    /**
     * We define this method aggregation here because on paging we need to hold the original estimate,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java
deleted file mode 100644
index 4804dde..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server;
-
-/** This is to be used in cases where a message delivery happens on an executor.
- *  Most MessageReference implementations will allow execution, and if it does,
- *  and the protocol requires an execution per message, this callback may be used.
- *
- *  At the time of this implementation only AMQP was used. */
-public interface MessageReferenceCallback {
-   void executeDelivery(MessageReference reference);
-}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 2fd70b6..0ebd7a8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -33,7 +34,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
  * This is useful for example, for stock prices, where you're only interested in the latest value
  * for a particular stock
  */
+@SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
    private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
@@ -238,7 +239,7 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
-      public void setCallback(MessageReferenceCallback callback) {
+      public void onDelivery(Consumer<? super MessageReference> callback) {
          // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 6d32c46..12acffd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -17,12 +17,12 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -55,7 +55,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    private Object protocolData;
 
-   private MessageReferenceCallback callback;
+   private Consumer<? super MessageReference> onDelivery;
 
    // Static --------------------------------------------------------
 
@@ -88,20 +88,23 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    // MessageReference implementation -------------------------------
 
    @Override
-   public void setCallback(MessageReferenceCallback callback) {
-      this.callback = callback;
+   public void onDelivery(Consumer<? super MessageReference> onDelivery) {
+      assert this.onDelivery == null;
+      this.onDelivery = onDelivery;
    }
 
+   /**
+    * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
+    */
    @Override
    public void run() {
-      MessageReferenceCallback callback = this.callback;
-
-      try {
-         if (callback != null) {
-            callback.executeDelivery(this);
+      final Consumer<? super MessageReference> onDelivery = this.onDelivery;
+      if (onDelivery != null) {
+         try {
+            onDelivery.accept(this);
+         } finally {
+            this.onDelivery = null;
          }
-      } finally {
-         this.callback = null;
       }
    }
 

Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] 04/04: This closes #2467

nigrofranz
In reply to this post by nigrofranz
This is an automated email from the ASF dual-hosted git repository.

nigrofranz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit e5e57442a2a64b60d7c7f388db2dc7839753e50c
Merge: 7b34b56 8281e3b
Author: Francesco Nigro <[hidden email]>
AuthorDate: Thu Jan 10 16:39:56 2019 +0100

    This closes #2467

 .../artemis/cli/commands/etc/artemis.profile       |   3 +
 .../artemis/api/core/TransportConfiguration.java   |  11 +
 .../core/remoting/impl/netty/NettyConnection.java  | 125 +++------
 .../org/apache/activemq/artemis/junit/Wait.java    |   7 +-
 .../amqp/broker/AMQPConnectionCallback.java        |  14 +-
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 165 ++++--------
 .../broker/ActiveMQProtonRemotingConnection.java   |   3 +-
 .../amqp/broker/ProtonProtocolManager.java         |  10 +
 .../amqp/proton/AMQPConnectionContext.java         | 228 ++++++++--------
 .../protocol/amqp/proton/AMQPSessionContext.java   |  40 ++-
 .../amqp/proton/ProtonServerReceiverContext.java   | 136 +++++-----
 .../amqp/proton/ProtonServerSenderContext.java     | 196 +++++++++-----
 .../amqp/proton/handler/ExecutorNettyAdapter.java  | 221 ++++++++++++++++
 .../amqp/proton/handler/ProtonHandler.java         | 287 +++++++++++----------
 .../transaction/ProtonTransactionHandler.java      |  14 +-
 .../proton/transaction/ProtonTransactionImpl.java  |  25 +-
 .../amqp/broker/AMQPSessionCallbackTest.java       |  83 +++---
 .../core/paging/cursor/PagedReferenceImpl.java     |  26 +-
 .../activemq/artemis/core/postoffice/Binding.java  |   4 +
 .../activemq/artemis/core/postoffice/Bindings.java |   3 +
 .../artemis/core/postoffice/impl/BindingsImpl.java | 145 +++++++----
 .../core/postoffice/impl/LocalQueueBinding.java    |   5 +
 .../core/postoffice/impl/PostOfficeImpl.java       | 161 ++++++------
 .../remoting/server/impl/RemotingServiceImpl.java  |   2 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   2 +-
 .../activemq/artemis/core/server/Consumer.java     |   8 +
 .../artemis/core/server/MessageReference.java      |  11 +
 .../apache/activemq/artemis/core/server/Queue.java |   8 +
 .../artemis/core/server/RoutingContext.java        |  29 +++
 .../artemis/core/server/ServerConsumer.java        |   6 +-
 .../artemis/core/server/ServerSession.java         |  17 ++
 .../artemis/core/server/impl/LastValueQueue.java   |  12 +
 .../core/server/impl/MessageReferenceImpl.java     |  26 +-
 .../artemis/core/server/impl/QueueImpl.java        |  54 ++--
 .../core/server/impl/RoutingContextImpl.java       |  82 +++++-
 .../core/server/impl/ServerConsumerImpl.java       |  22 +-
 .../core/server/impl/ServerSessionImpl.java        |  41 ++-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   5 +
 .../integration/addressing/AddressingTest.java     |   2 +-
 .../integration/amqp/AmqpExpiredMessageTest.java   |   2 +-
 .../integration/amqp/AmqpFlowControlFailTest.java  |   4 +-
 .../integration/amqp/AmqpSendReceiveTest.java      |  49 ++++
 .../integration/amqp/AmqpTransactionTest.java      |   2 +-
 .../integration/amqp/JMSNonDestructiveTest.java    |   8 +-
 .../tests/integration/cli/DummyServerConsumer.java |   5 +
 .../tests/integration/client/ConsumerTest.java     |  10 +-
 .../tests/integration/client/HangConsumerTest.java |   5 +
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 +
 .../impl/WildcardAddressManagerUnitTest.java       |   5 +
 49 files changed, 1529 insertions(+), 805 deletions(-)