[GitHub] asfgit closed pull request #2467: ARTEMIS-2205 Performance improvements on AMQP and other parts

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] asfgit closed pull request #2467: ARTEMIS-2205 Performance improvements on AMQP and other parts

GitBox
asfgit closed pull request #2467: ARTEMIS-2205 Performance improvements on AMQP and other parts
URL: https://github.com/apache/activemq-artemis/pull/2467
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
index 876b4cba96..af8aa86daf 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
@@ -43,3 +43,6 @@ JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -D
 
 # Debug args: Uncomment to enable debug
 #DEBUG_ARGS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
+
+# Debug args: Uncomment for async profiler
+#DEBUG_ARGS="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints"
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
index 7fcfcd5e1b..ee285a314e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
@@ -183,6 +183,17 @@ public String getFactoryClassName() {
       return extraProps;
    }
 
+   public Map<String, Object> getCombinedParams() {
+      Map<String, Object> combined = new HashMap<>();
+      if (params != null) {
+         combined.putAll(params);
+      }
+      if (extraProps != null) {
+         combined.putAll(extraProps);
+      }
+      return combined;
+   }
+
    @Override
    public int hashCode() {
       int result = name != null ? name.hashCode() : 0;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index f8195fbdce..1032a352bb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -21,7 +21,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
 import io.netty.buffer.ByteBuf;
@@ -60,19 +59,12 @@
     * here for when the connection (or Netty Channel) becomes available again.
     */
    private final List<ReadyListener> readyListeners = new ArrayList<>();
-   private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = ThreadLocal.withInitial(ArrayList::new);
+   private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = new ThreadLocal<>();
 
    private final boolean batchingEnabled;
    private final int writeBufferHighWaterMark;
    private final int batchLimit;
 
-   /**
-    * This counter is splitted in 2 variables to write it with less performance
-    * impact: no volatile get is required to update its value
-    */
-   private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong();
-   private long pendingWritesOnEventLoop = 0;
-
    private boolean closed;
    private RemotingConnection protocolConnection;
 
@@ -129,18 +121,6 @@ public final int pendingWritesOnChannel() {
       return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    }
 
-   public final long pendingWritesOnEventLoop() {
-      final EventLoop eventLoop = channel.eventLoop();
-      final boolean inEventLoop = eventLoop.inEventLoop();
-      final long pendingWritesOnEventLoop;
-      if (inEventLoop) {
-         pendingWritesOnEventLoop = this.pendingWritesOnEventLoop;
-      } else {
-         pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get();
-      }
-      return pendingWritesOnEventLoop;
-   }
-
    public final Channel getNettyChannel() {
       return channel;
    }
@@ -163,19 +143,27 @@ public final boolean isWritable(ReadyListener callback) {
 
    @Override
    public final void fireReady(final boolean ready) {
-      final ArrayList<ReadyListener> readyToCall = localListenersPool.get();
+      ArrayList<ReadyListener> readyToCall = localListenersPool.get();
+      if (readyToCall != null) {
+         localListenersPool.set(null);
+      }
       synchronized (readyListeners) {
          this.ready = ready;
 
          if (ready) {
             final int size = this.readyListeners.size();
-            readyToCall.ensureCapacity(size);
+            if (readyToCall != null) {
+               readyToCall.ensureCapacity(size);
+            }
             try {
                for (int i = 0; i < size; i++) {
                   final ReadyListener readyListener = readyListeners.get(i);
                   if (readyListener == null) {
                      break;
                   }
+                  if (readyToCall == null) {
+                     readyToCall = new ArrayList<>(size);
+                  }
                   readyToCall.add(readyListener);
                }
             } finally {
@@ -183,18 +171,23 @@ public final void fireReady(final boolean ready) {
             }
          }
       }
-      try {
-         final int size = readyToCall.size();
-         for (int i = 0; i < size; i++) {
-            try {
-               final ReadyListener readyListener = readyToCall.get(i);
-               readyListener.readyForWriting();
-            } catch (Throwable logOnly) {
-               ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
+      if (readyToCall != null) {
+         try {
+            readyToCall.forEach(readyListener -> {
+               try {
+                  readyListener.readyForWriting();
+               } catch (Throwable logOnly) {
+                  ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
+               }
+            });
+         } catch (Throwable t) {
+            ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
+         } finally {
+            readyToCall.clear();
+            if (localListenersPool.get() != null) {
+               localListenersPool.set(readyToCall);
             }
          }
-      } finally {
-         readyToCall.clear();
       }
    }
 
@@ -256,7 +249,7 @@ public ActiveMQBuffer createTransportBuffer(final int size) {
       } catch (OutOfMemoryError oom) {
          final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
          // I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
-         logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
+         logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
          throw oom;
       }
    }
@@ -342,10 +335,7 @@ private boolean isAllowedToBlock() {
    private boolean canWrite(final int requiredCapacity) {
       //evaluate if the write request could be taken:
       //there is enough space in the write buffer?
-      //The pending writes on event loop will eventually go into the Netty write buffer, hence consider them
-      //as part of the heuristic!
-      final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop();
-      final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel();
+      final long totalPendingWrites = this.pendingWritesOnChannel();
       final boolean canWrite;
       if (requiredCapacity > this.writeBufferHighWaterMark) {
          canWrite = totalPendingWrites == 0;
@@ -369,34 +359,6 @@ public final void write(ActiveMQBuffer buffer,
       }
       //no need to lock because the Netty's channel is thread-safe
       //and the order of write is ensured by the order of the write calls
-      final EventLoop eventLoop = channel.eventLoop();
-      final boolean inEventLoop = eventLoop.inEventLoop();
-      if (!inEventLoop) {
-         writeNotInEventLoop(buffer, flush, batched, futureListener);
-      } else {
-         // OLD COMMENT:
-         // create a task which will be picked up by the eventloop and trigger the write.
-         // This is mainly needed as this method is triggered by different threads for the same channel.
-         // if we not do this we may produce out of order writes.
-         // NOTE:
-         // the submitted task does not effect in any way the current written size in the batch
-         // until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!!
-         // To solve it, will be necessary to manually perform the count of the current batch instead of rely on the
-         // Channel:Config::writeBufferHighWaterMark value.
-         this.pendingWritesOnEventLoop += readableBytes;
-         this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
-         eventLoop.execute(() -> {
-            this.pendingWritesOnEventLoop -= readableBytes;
-            this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
-            writeInEventLoop(buffer, flush, batched, futureListener);
-         });
-      }
-   }
-
-   private void writeNotInEventLoop(ActiveMQBuffer buffer,
-                                    final boolean flush,
-                                    final boolean batched,
-                                    final ChannelFutureListener futureListener) {
       final Channel channel = this.channel;
       final ChannelPromise promise;
       if (flush || (futureListener != null)) {
@@ -406,7 +368,6 @@ private void writeNotInEventLoop(ActiveMQBuffer buffer,
       }
       final ChannelFuture future;
       final ByteBuf bytes = buffer.byteBuf();
-      final int readableBytes = bytes.readableBytes();
       assert readableBytes >= 0;
       final int writeBatchSize = this.batchLimit;
       final boolean batchingEnabled = this.batchingEnabled;
@@ -420,33 +381,17 @@ private void writeNotInEventLoop(ActiveMQBuffer buffer,
       }
       if (flush) {
          //NOTE: this code path seems used only on RemotingConnection::disconnect
-         waitFor(promise, DEFAULT_WAIT_MILLIS);
+         flushAndWait(channel, promise);
       }
    }
 
-   private void writeInEventLoop(ActiveMQBuffer buffer,
-                                 final boolean flush,
-                                 final boolean batched,
-                                 final ChannelFutureListener futureListener) {
-      //no need to lock because the Netty's channel is thread-safe
-      //and the order of write is ensured by the order of the write calls
-      final ChannelPromise promise;
-      if (futureListener != null) {
-         promise = channel.newPromise();
-      } else {
-         promise = channel.voidPromise();
-      }
-      final ChannelFuture future;
-      final ByteBuf bytes = buffer.byteBuf();
-      final int readableBytes = bytes.readableBytes();
-      final int writeBatchSize = this.batchLimit;
-      if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
-         future = writeBatch(bytes, readableBytes, promise);
+   private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
+      if (!channel.eventLoop().inEventLoop()) {
+         waitFor(promise, DEFAULT_WAIT_MILLIS);
       } else {
-         future = channel.writeAndFlush(bytes, promise);
-      }
-      if (futureListener != null) {
-         future.addListener(futureListener);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Calling write with flush from a thread where it's not allowed");
+         }
       }
    }
 
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
index c0aa55d40c..5c817fb1cc 100644
--- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
@@ -94,7 +94,12 @@ public static void assertFalse(String failureMessage, Condition condition) throw
 
 
    public static void assertTrue(String failureMessage, Condition condition) throws Exception {
-      boolean result = waitFor(condition);
+      assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
+   }
+
+   public static void assertTrue(String failureMessage, Condition condition, final long duration) throws Exception {
+
+      boolean result = waitFor(condition, duration);
 
       if (!result) {
          Assert.fail(failureMessage);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 84fdd24aa6..d34ce80fca 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -73,7 +73,7 @@
 
    protected AMQPConnectionContext amqpConnection;
 
-   private final Executor closeExecutor;
+   private final Executor sessionExecutor;
 
    private String remoteContainerId;
 
@@ -85,15 +85,19 @@
 
    public AMQPConnectionCallback(ProtonProtocolManager manager,
                                  Connection connection,
-                                 Executor closeExecutor,
+                                 Executor sessionExecutor,
                                  ActiveMQServer server) {
       this.manager = manager;
       this.connection = connection;
-      this.closeExecutor = closeExecutor;
+      this.sessionExecutor = sessionExecutor;
       this.server = server;
       saslMechanisms = manager.getSaslMechanisms();
    }
 
+   public Connection getTransportConnection() {
+      return connection;
+   }
+
    public String[] getSaslMechanisms() {
       return saslMechanisms;
    }
@@ -213,7 +217,7 @@ public boolean isWritable(ReadyListener readyListener) {
 
 
    public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
-      return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
+      return new AMQPSessionCallback(this, manager, connection, this.connection, sessionExecutor, server.newOperationContext());
    }
 
    public void sendSASLSupported() {
@@ -256,7 +260,7 @@ public void connectionFailed(ActiveMQException exception, boolean failedOver, St
    public Binary newTransaction() {
       XidImpl xid = newXID();
       Binary binary = new Binary(xid.getGlobalTransactionId());
-      Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
+      Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1, amqpConnection);
       transactions.put(binary, transaction);
       return binary;
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 1ca4410a4d..0e2cf6dfcb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -40,15 +37,14 @@
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerProducer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
-import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -104,7 +100,8 @@
 
    private final Executor sessionExecutor;
 
-   private final AtomicBoolean draining = new AtomicBoolean(false);
+   private final boolean directDeliver;
+
 
    private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
@@ -125,6 +122,7 @@ public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
       this.transportConnection = transportConnection;
       this.sessionExecutor = executor;
       this.operationContext = operationContext;
+      this.directDeliver = manager.isDirectDeliver();
    }
 
    @Override
@@ -133,28 +131,6 @@ public boolean isWritable(ReadyListener callback, Object protocolContext) {
       return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED;
    }
 
-   public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
-      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
-      if (drain) {
-         // If the draining is already running, then don't do anything
-         if (draining.compareAndSet(false, true)) {
-            final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
-            serverConsumer.forceDelivery(1, new Runnable() {
-               @Override
-               public void run() {
-                  try {
-                     plugSender.reportDrained();
-                  } finally {
-                     draining.set(false);
-                  }
-               }
-            });
-         }
-      } else {
-         serverConsumer.receiveCredits(-1);
-      }
-   }
-
    public void withinContext(RunnableEx run) throws Exception {
       OperationContext context = recoverContext();
       try {
@@ -180,7 +156,7 @@ public void browserFinished(ServerConsumer consumer) {
 
    @Override
    public boolean supportsDirectDelivery() {
-      return false;
+      return manager.isDirectDeliver();
    }
 
    public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
@@ -347,7 +323,6 @@ public boolean checkAddressAndAutocreateIfPossible(SimpleString address, Routing
       return result;
    }
 
-
    public AddressQueryResult addressQuery(SimpleString addressName,
                                           RoutingType routingType,
                                           boolean autoCreate) throws Exception {
@@ -373,41 +348,8 @@ public AddressQueryResult addressQuery(SimpleString addressName,
    }
 
    public void closeSender(final Object brokerConsumer) throws Exception {
-
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      Runnable runnable = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               consumer.close(false);
-               latch.countDown();
-            } catch (Exception e) {
-            }
-         }
-      };
-
-      // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
-      // to avoid deadlocks the close has to be done outside of the main thread on an executor
-      // otherwise you could get a deadlock
-      Executor executor = protonSPI.getExeuctor();
-
-      if (executor != null) {
-         executor.execute(runnable);
-      } else {
-         runnable.run();
-      }
-
-      try {
-         // a short timeout will do.. 1 second is already long enough
-         if (!latch.await(1, TimeUnit.SECONDS)) {
-            logger.debug("Could not close consumer on time");
-         }
-      } catch (InterruptedException e) {
-         throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
-      }
-
+      consumer.close(false);
       consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
    }
 
@@ -418,12 +360,19 @@ public String tempQueueName() {
    public void close() throws Exception {
       //need to check here as this can be called if init fails
       if (serverSession != null) {
-         OperationContext context = recoverContext();
-         try {
-            serverSession.close(false);
-         } finally {
-            resetContext(context);
-         }
+         // we cannot hold the nettyExecutor on this rollback here, otherwise other connections will be waiting
+         sessionExecutor.execute(() -> {
+            OperationContext context = recoverContext();
+            try {
+               try {
+                  serverSession.close(false);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            } finally {
+               resetContext(context);
+            }
+         });
       }
    }
 
@@ -468,7 +417,8 @@ public void serverSend(final ProtonServerReceiverContext context,
                           final Delivery delivery,
                           SimpleString address,
                           int messageFormat,
-                          ReadableBuffer data) throws Exception {
+                          ReadableBuffer data,
+                          RoutingContext routingContext) throws Exception {
       AMQPMessage message = new AMQPMessage(messageFormat, data, null, coreMessageObjectPools);
       if (address != null) {
          message.setAddress(address);
@@ -503,7 +453,7 @@ public void serverSend(final ProtonServerReceiverContext context,
                rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
             }
          } else {
-            serverSend(transaction, message, delivery, receiver);
+            serverSend(context, transaction, message, delivery, receiver, routingContext);
          }
       } finally {
          resetContext(oldcontext);
@@ -520,14 +470,11 @@ private void rejectMessage(Delivery delivery, Symbol errorCondition, String erro
       afterIO(new IOCallback() {
          @Override
          public void done() {
-            connection.lock();
-            try {
+            connection.runLater(() -> {
                delivery.disposition(rejected);
                delivery.settle();
-            } finally {
-               connection.unlock();
-            }
-            connection.flush();
+               connection.flush();
+            });
          }
 
          @Override
@@ -538,19 +485,20 @@ public void onError(int errorCode, String errorMessage) {
 
    }
 
-   private void serverSend(final Transaction transaction,
+   private void serverSend(final ProtonServerReceiverContext context,
+                           final Transaction transaction,
                            final Message message,
                            final Delivery delivery,
-                           final Receiver receiver) throws Exception {
+                           final Receiver receiver,
+                           final RoutingContext routingContext) throws Exception {
       message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
       invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
-      serverSession.send(transaction, message, false, false);
+      serverSession.send(transaction, message, directDeliver, false, routingContext);
 
       afterIO(new IOCallback() {
          @Override
          public void done() {
-            connection.lock();
-            try {
+            connection.runLater(() -> {
                if (delivery.getRemoteState() instanceof TransactionalState) {
                   TransactionalState txAccepted = new TransactionalState();
                   txAccepted.setOutcome(Accepted.getInstance());
@@ -561,21 +509,17 @@ public void done() {
                   delivery.disposition(Accepted.getInstance());
                }
                delivery.settle();
-            } finally {
-               connection.unlock();
-            }
-            connection.flush();
+               context.flow();
+               connection.flush();
+            });
          }
 
          @Override
          public void onError(int errorCode, String errorMessage) {
-            connection.lock();
-            try {
+            connection.runNow(() -> {
                receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
                connection.flush();
-            } finally {
-               connection.unlock();
-            }
+            });
          }
       });
    }
@@ -635,15 +579,12 @@ public int sendMessage(MessageReference ref, Message message, ServerConsumer con
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
 
       try {
-         return plugSender.deliverMessage(ref, deliveryCount, transportConnection);
+         return plugSender.deliverMessage(ref, consumer);
       } catch (Exception e) {
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
             connection.flush();
-         } finally {
-            connection.unlock();
-         }
+         });
          throw new IllegalStateException("Can't deliver message " + e, e);
       }
 
@@ -673,23 +614,22 @@ public void closed() {
    @Override
    public void disconnect(ServerConsumer consumer, SimpleString queueName) {
       ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
-      connection.lock();
-      try {
-         ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
-         connection.flush();
-      } catch (ActiveMQAMQPException e) {
-         logger.error("Error closing link for " + consumer.getQueue().getAddress());
-      } finally {
-         connection.unlock();
-      }
+      connection.runNow(() -> {
+         try {
+            ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
+            connection.flush();
+         } catch (ActiveMQAMQPException e) {
+            logger.error("Error closing link for " + consumer.getQueue().getAddress());
+         }
+      });
    }
 
    @Override
    public boolean hasCredits(ServerConsumer consumer) {
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
 
-      if (plugSender != null && plugSender.getSender().getCredit() > 0) {
-         return true;
+      if (plugSender != null) {
+         return plugSender.hasCredits();
       } else {
          return false;
       }
@@ -757,6 +697,10 @@ public void setTransactionHandler(ProtonTransactionHandler transactionHandler) {
       this.transactionHandler = transactionHandler;
    }
 
+   public Connection getTransportConnection() {
+      return transportConnection;
+   }
+
    public ProtonTransactionHandler getTransactionHandler() {
       return this.transactionHandler;
    }
@@ -782,4 +726,7 @@ public synchronized void setResult(SimpleString parameterAddress, T result) {
       }
 
    }
+   interface CreditRunnable extends Runnable {
+      boolean isRun();
+   }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 41f6e788c8..a06765d91e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -122,7 +122,8 @@ public void disconnect(boolean criticalError) {
       ErrorCondition errorCondition = new ErrorCondition();
       errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
       amqpConnection.close(errorCondition);
-      getTransportConnection().close();
+      // There's no need to flush, amqpConnection.close() is calling flush
+      // as long this semantic is kept no need to flush here
    }
 
    /**
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index d86dc81826..5b9aa38b49 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -77,6 +77,8 @@
 
    private Long amqpIdleTimeout;
 
+   private boolean directDeliver = true;
+
 
    /*
    * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
@@ -131,6 +133,14 @@ public ProtonProtocolManager setAmqpIdleTimeout(Long ttl) {
       return this;
    }
 
+   public boolean isDirectDeliver() {
+      return directDeliver;
+   }
+
+   public ProtonProtocolManager setDirectDeliver(boolean directDeliver) {
+      this.directDeliver = directDeliver;
+      return this;
+   }
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 35521064e8..07b2875c92 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
-
 import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,12 +25,16 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoop;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExecutorNettyAdapter;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
@@ -59,7 +57,11 @@
 import org.apache.qpid.proton.engine.Transport;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
 
 public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
 
@@ -111,7 +113,13 @@ public AMQPConnectionContext(ProtonProtocolManager protocolManager,
 
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
-      this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
+      EventLoop nettyExecutor;
+      if (connectionCallback.getTransportConnection() instanceof NettyConnection) {
+         nettyExecutor = ((NettyConnection) connectionCallback.getTransportConnection()).getNettyChannel().eventLoop();
+      } else {
+         nettyExecutor = new ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor());
+      }
+      this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
       handler.addEventHandler(this);
       Transport transport = handler.getTransport();
       transport.setEmitFlowEventOnSend(false);
@@ -127,6 +135,10 @@ public AMQPConnectionContext(ProtonProtocolManager protocolManager,
       }
    }
 
+   public void requireInHandler() {
+      handler.requireHandler();
+   }
+
    public void scheduledFlush() {
       handler.scheduledFlush();
    }
@@ -159,35 +171,19 @@ public void inputBuffer(ByteBuf buffer) {
    }
 
    public void destroy() {
-      connectionCallback.close();
+      handler.runLater(() -> connectionCallback.close());
    }
 
    public boolean isSyncOnFlush() {
       return false;
    }
 
-   public boolean tryLock(long time, TimeUnit timeUnit) {
-      return handler.tryLock(time, timeUnit);
-   }
-
-   public void lock() {
-      handler.lock();
-   }
-
-   public void unlock() {
-      handler.unlock();
-   }
-
-   public int capacity() {
-      return handler.capacity();
-   }
-
    public void flush() {
       handler.flush();
    }
 
    public void close(ErrorCondition errorCondition) {
-      handler.close(errorCondition);
+      handler.close(errorCondition, this);
    }
 
    protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
@@ -201,6 +197,18 @@ protected AMQPSessionContext getSessionExtension(Session realSession) throws Act
       return sessionExtension;
    }
 
+   public void runOnPool(Runnable run) {
+      handler.runOnPool(run);
+   }
+
+   public void runNow(Runnable run) {
+      handler.runNow(run);
+   }
+
+   public void runLater(Runnable run) {
+      handler.runLater(run);
+   }
+
    protected boolean validateConnection(Connection connection) {
       return connectionCallback.validateConnection(connection, handler.getSASLResult());
    }
@@ -224,6 +232,10 @@ public String getPubSubPrefix() {
    protected void initInternal() throws Exception {
    }
 
+   public AMQPConnectionCallback getConnectionCallback() {
+      return connectionCallback;
+   }
+
    protected void remoteLinkOpened(Link link) throws Exception {
 
       AMQPSessionContext protonSession = getSessionExtension(link.getSession());
@@ -314,7 +326,7 @@ public void onAuthInit(ProtonHandler handler, Connection connection, boolean sas
          if (!connectionCallback.isSupportsAnonymous()) {
             connectionCallback.sendSASLSupported();
             connectionCallback.close();
-            handler.close(null);
+            handler.close(null, this);
          }
       }
    }
@@ -334,7 +346,7 @@ public void onSaslMechanismsOffered(final ProtonHandler handler, final String[]
    @Override
    public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) {
       connectionCallback.close();
-      handler.close(null);
+      handler.close(null, this);
    }
 
    @Override
@@ -359,59 +371,73 @@ public boolean flowControl(ReadyListener readyListener) {
 
    @Override
    public void onRemoteOpen(Connection connection) throws Exception {
-      lock();
+      handler.requireHandler();
       try {
-         try {
-            initInternal();
-         } catch (Exception e) {
-            log.error("Error init connection", e);
-         }
-         if (!validateConnection(connection)) {
-            connection.close();
-         } else {
-            connection.setContext(AMQPConnectionContext.this);
-            connection.setContainer(containerId);
-            connection.setProperties(connectionProperties);
-            connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-            connection.open();
-         }
-      } finally {
-         unlock();
+         initInternal();
+      } catch (Exception e) {
+         log.error("Error init connection", e);
+      }
+      if (!validateConnection(connection)) {
+         connection.close();
+      } else {
+         connection.setContext(AMQPConnectionContext.this);
+         connection.setContainer(containerId);
+         connection.setProperties(connectionProperties);
+         connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+         connection.open();
       }
       initialise();
 
-         /*
-         * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
-         * but its here in case we add support for outbound connections.
-         * */
+      /*
+      * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
+      * but its here in case we add support for outbound connections.
+      * */
       if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
          long nextKeepAliveTime = handler.tick(true);
          if (nextKeepAliveTime != 0 && scheduledPool != null) {
-            scheduledPool.schedule(new Runnable() {
-               @Override
-               public void run() {
-                  Long rescheduleAt = handler.tick(false);
-                  if (rescheduleAt == null) {
-                     // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
-                     scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
-                  } else if (rescheduleAt != 0) {
-                     scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
-                  }
-               }
-            }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+            scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+         }
+      }
+   }
+
+   class TickerRunnable implements Runnable {
+
+      final ScheduleRunnable scheduleRunnable;
+
+      TickerRunnable(ScheduleRunnable scheduleRunnable) {
+         this.scheduleRunnable = scheduleRunnable;
+      }
+
+      @Override
+      public void run() {
+         Long rescheduleAt = handler.tick(false);
+         if (rescheduleAt == null) {
+            // this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
+            scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS);
+         } else if (rescheduleAt != 0) {
+            scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
          }
       }
    }
 
+   class ScheduleRunnable implements Runnable {
+
+      TickerRunnable tickerRunnable = new TickerRunnable(this);
+
+      @Override
+      public void run() {
+
+         // The actual tick has to happen within a Netty Worker, to avoid requiring a lock
+         // this will also be used to flush the data directly into netty connection's executor
+         handler.runLater(tickerRunnable);
+      }
+   }
+
    @Override
    public void onRemoteClose(Connection connection) {
-      lock();
-      try {
-         connection.close();
-         connection.free();
-      } finally {
-         unlock();
-      }
+      handler.requireHandler();
+      connection.close();
+      connection.free();
 
       for (AMQPSessionContext protonSession : sessions.values()) {
          protonSession.close();
@@ -430,31 +456,24 @@ public void onLocalOpen(Session session) throws Exception {
 
    @Override
    public void onRemoteOpen(Session session) throws Exception {
+      handler.requireHandler();
       getSessionExtension(session).initialise();
-      lock();
-      try {
-         session.open();
-      } finally {
-         unlock();
-      }
+      session.open();
    }
 
    @Override
    public void onRemoteClose(Session session) throws Exception {
-      lock();
-      try {
+      handler.runLater(() -> {
          session.close();
          session.free();
-      } finally {
-         unlock();
-      }
 
-      AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
-      if (sessionContext != null) {
-         sessionContext.close();
-         sessions.remove(session);
-         session.setContext(null);
-      }
+         AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
+         if (sessionContext != null) {
+            sessionContext.close();
+            sessions.remove(session);
+            session.setContext(null);
+         }
+      });
    }
 
    @Override
@@ -471,40 +490,42 @@ public void onFlow(Link link) throws Exception {
 
    @Override
    public void onRemoteClose(Link link) throws Exception {
-      lock();
-      try {
+      handler.requireHandler();
+
+      // We scheduled it for later, as that will work through anything that's pending on the current deliveries.
+      runNow(() -> {
          link.close();
          link.free();
-      } finally {
-         unlock();
-      }
 
-      ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
-      if (linkContext != null) {
-         linkContext.close(true);
-      }
+         ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
+         if (linkContext != null) {
+            try {
+               linkContext.close(true);
+            } catch (Exception e) {
+               log.error(e.getMessage(), e);
+            }
+         }
+         flush();
+
+      });
    }
 
    @Override
    public void onRemoteDetach(Link link) throws Exception {
-      boolean handleAsClose = link.getSource() != null
-                              && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH;
+      handler.requireHandler();
+      boolean handleAsClose = link.getSource() != null && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH;
 
       if (handleAsClose) {
          onRemoteClose(link);
       } else {
-         lock();
-         try {
-            link.detach();
-            link.free();
-         } finally {
-            unlock();
-         }
+         link.detach();
+         link.free();
       }
    }
 
    @Override
    public void onLocalDetach(Link link) throws Exception {
+      handler.requireHandler();
       Object context = link.getContext();
       if (context instanceof ProtonServerSenderContext) {
          ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
@@ -514,6 +535,7 @@ public void onLocalDetach(Link link) throws Exception {
 
    @Override
    public void onDelivery(Delivery delivery) throws Exception {
+      handler.requireHandler();
       ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
       if (handler != null) {
          handler.onMessage(delivery);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 5cd3515203..c8bb13e0c4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -150,13 +150,11 @@ public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
       coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
 
       receiver.setContext(transactionHandler);
-      connection.lock();
-      try {
+      connection.runNow(() -> {
          receiver.open();
          receiver.flow(connection.getAmqpCredits());
-      } finally {
-         connection.unlock();
-      }
+         connection.flush();
+      });
    }
 
    public void addSender(Sender sender) throws Exception {
@@ -169,24 +167,20 @@ public void addSender(Sender sender) throws Exception {
          senders.put(sender, protonSender);
          serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
          sender.setContext(protonSender);
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             sender.open();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
 
          protonSender.start();
       } catch (ActiveMQAMQPException e) {
          senders.remove(sender);
          sender.setSource(null);
          sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             sender.close();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
       }
    }
 
@@ -206,22 +200,18 @@ public void addReceiver(Receiver receiver) throws Exception {
          ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
          sessionSPI.addProducer(serverProducer);
          receiver.setContext(protonReceiver);
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             receiver.open();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
       } catch (ActiveMQAMQPException e) {
          receivers.remove(receiver);
          receiver.setTarget(null);
          receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         connection.lock();
-         try {
+         connection.runNow(() -> {
             receiver.close();
-         } finally {
-            connection.unlock();
-         }
+            connection.flush();
+         });
       }
    }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index b0cfba0a93..14463730a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -25,7 +25,9 @@
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -49,6 +51,9 @@
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
+/**
+ * This is the equivalent for the ServerProducer
+ */
 public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
 
    private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
@@ -63,35 +68,43 @@
 
    protected final AMQPSessionCallback sessionSPI;
 
-   /** We create this AtomicRunnable with setRan.
-    *  This is because we always reuse the same instance.
-    *  In case the creditRunnable was run, we reset and send it over.
-    *  We set it as ran as the first one should always go through */
-   protected final AtomicRunnable creditRunnable;
+   RoutingContext routingContext = new RoutingContextImpl(null);
 
+   /**
+    * We create this AtomicRunnable with setRan.
+    * This is because we always reuse the same instance.
+    * In case the creditRunnable was run, we reset and send it over.
+    * We set it as ran as the first one should always go through
+    */
+   protected final AtomicRunnable creditRunnable;
 
-   /** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */
-   public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
+   /**
+    * This Credit Runnable may be used in Mock tests to simulate the credit semantic here
+    */
+   public static AtomicRunnable createCreditRunnable(int refill,
+                                                     int threshold,
+                                                     Receiver receiver,
+                                                     AMQPConnectionContext connection) {
+      Runnable creditRunnable = () -> {
+
+         connection.requireInHandler();
+         if (receiver.getCredit() <= threshold) {
+            int topUp = refill - receiver.getCredit();
+            if (topUp > 0) {
+               // System.out.println("Sending " + topUp + " towards client");
+               receiver.flow(topUp);
+               connection.flush();
+            }
+         }
+      };
       return new AtomicRunnable() {
          @Override
          public void atomicRun() {
-            connection.lock();
-            try {
-               if (receiver.getCredit() <= threshold) {
-                  int topUp = refill - receiver.getCredit();
-                  if (topUp > 0) {
-                     receiver.flow(topUp);
-                  }
-               }
-            } finally {
-               connection.unlock();
-            }
-            connection.flush();
+            connection.runNow(creditRunnable);
          }
       };
    }
 
-
    /*
     The maximum number of credits we will allocate to clients.
     This number is also used by the broker when refresh client credits.
@@ -249,41 +262,46 @@ private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
     */
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      try {
-         Receiver receiver = ((Receiver) delivery.getLink());
-
-         if (receiver.current() != delivery) {
-            return;
-         }
+      connection.requireInHandler();
+      Receiver receiver = ((Receiver) delivery.getLink());
 
-         if (delivery.isAborted()) {
-            // Aborting implicitly remotely settles, so advance
-            // receiver to the next delivery and settle locally.
-            receiver.advance();
-            delivery.settle();
+      if (receiver.current() != delivery) {
+         return;
+      }
 
-            // Replenish the credit if not doing a drain
-            if (!receiver.getDrain()) {
-               receiver.flow(1);
-            }
+      if (delivery.isAborted()) {
+         // Aborting implicitly remotely settles, so advance
+         // receiver to the next delivery and settle locally.
+         receiver.advance();
+         delivery.settle();
 
-            return;
-         } else if (delivery.isPartial()) {
-            return;
+         // Replenish the credit if not doing a drain
+         if (!receiver.getDrain()) {
+            receiver.flow(1);
          }
 
-         Transaction tx = null;
-         ReadableBuffer data = receiver.recv();
-         receiver.advance();
+         return;
+      } else if (delivery.isPartial()) {
+         return;
+      }
 
-         if (delivery.getRemoteState() instanceof TransactionalState) {
-            TransactionalState txState = (TransactionalState) delivery.getRemoteState();
-            tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
-         }
+      ReadableBuffer data = receiver.recv();
+      receiver.advance();
+      Transaction tx = null;
+
+      if (delivery.getRemoteState() instanceof TransactionalState) {
+         TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+         tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
+      }
 
-         sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
+      final Transaction txUsed = tx;
 
-         flow();
+      actualDelivery(delivery, receiver, data, txUsed);
+   }
+
+   private void actualDelivery(Delivery delivery, Receiver receiver, ReadableBuffer data, Transaction tx) {
+      try {
+         sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext);
       } catch (Exception e) {
          log.warn(e.getMessage(), e);
          Rejected rejected = new Rejected();
@@ -294,13 +312,17 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
          } else {
             condition.setCondition(Symbol.valueOf("failed"));
          }
+         connection.runLater(() -> {
 
-         condition.setDescription(e.getMessage());
-         rejected.setError(condition);
+            condition.setDescription(e.getMessage());
+            rejected.setError(condition);
+
+            delivery.disposition(rejected);
+            delivery.settle();
+            flow();
+            connection.flush();
+         });
 
-         delivery.disposition(rejected);
-         delivery.settle();
-         flow();
       }
    }
 
@@ -324,6 +346,7 @@ public void close(ErrorCondition condition) throws ActiveMQAMQPException {
    }
 
    public void flow() {
+      connection.requireInHandler();
       if (!creditRunnable.isRun()) {
          return; // nothing to be done as the previous one did not run yet
       }
@@ -339,13 +362,10 @@ public void flow() {
    }
 
    public void drain(int credits) {
-      connection.lock();
-      try {
+      connection.runNow(() -> {
          receiver.drain(credits);
-      } finally {
-         connection.unlock();
-      }
-      connection.flush();
+         connection.flush();
+      });
    }
 
    public int drained() {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index c4aca48e64..4caf2d0047 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -20,7 +20,8 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -33,6 +34,8 @@
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -49,7 +52,7 @@
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -74,7 +77,7 @@
 import org.jboss.logging.Logger;
 
 /**
- * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
+ * This is the Equivalent for the ServerConsumer
  */
 public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
 
@@ -89,7 +92,7 @@
    private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();
 
    private Consumer brokerConsumer;
-
+   private ReadyListener onflowControlReady;
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
    protected final AMQPConnectionContext connection;
@@ -104,6 +107,17 @@
    private boolean isVolatile = false;
    private boolean preSettle;
    private SimpleString tempQueueName;
+   private final AtomicBoolean draining = new AtomicBoolean(false);
+
+   private int credits = 0;
+
+   private AtomicInteger pending = new AtomicInteger(0);
+   /**
+    * The model proton uses requires us to hold a lock in certain times
+    * to sync the credits we have versus the credits that are being held in proton
+    * */
+   private final Object creditsLock = new Object();
+   private final java.util.function.Consumer<? super MessageReference> executeDelivery;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection,
                                     Sender sender,
@@ -114,6 +128,7 @@ public ProtonServerSenderContext(AMQPConnectionContext connection,
       this.sender = sender;
       this.protonSession = protonSession;
       this.sessionSPI = server;
+      this.executeDelivery = this::executeDelivery;
    }
 
    public Object getBrokerConsumer() {
@@ -122,7 +137,51 @@ public Object getBrokerConsumer() {
 
    @Override
    public void onFlow(int currentCredits, boolean drain) {
-      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
+      connection.requireInHandler();
+
+      setupCredit();
+
+      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) brokerConsumer;
+      if (drain) {
+         // If the draining is already running, then don't do anything
+         if (draining.compareAndSet(false, true)) {
+            final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
+            serverConsumer.forceDelivery(1, new Runnable() {
+               @Override
+               public void run() {
+                  try {
+                     connection.runNow(() -> {
+                        plugSender.reportDrained();
+                        setupCredit();
+                     });
+                  } finally {
+                     draining.set(false);
+                  }
+               }
+            });
+         }
+      } else {
+         serverConsumer.receiveCredits(-1);
+      }
+   }
+
+   public boolean hasCredits() {
+      if (!connection.flowControl(onflowControlReady)) {
+         return false;
+      }
+
+      synchronized (creditsLock) {
+         return credits > 0 && sender.getLocalState() != EndpointState.CLOSED;
+      }
+   }
+
+   private void setupCredit() {
+      synchronized (creditsLock) {
+         this.credits = sender.getCredit() - pending.get();
+         if (credits < 0) {
+            credits = 0;
+         }
+      }
    }
 
    public Sender getSender() {
@@ -431,6 +490,7 @@ public void initialise() throws Exception {
       boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
       try {
          brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
+         onflowControlReady = brokerConsumer::promptDelivery;
       } catch (ActiveMQAMQPResourceLimitExceededException e1) {
          throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (ActiveMQSecurityException e) {
@@ -469,20 +529,17 @@ public void close(ErrorCondition condition) throws ActiveMQAMQPException {
          sender.setCondition(condition);
       }
       protonSession.removeSender(sender);
-      connection.lock();
-      try {
-         sender.close();
-      } finally {
-         connection.unlock();
-      }
-      connection.flush();
 
-      try {
-         sessionSPI.closeSender(brokerConsumer);
-      } catch (Exception e) {
-         log.warn(e.getMessage(), e);
-         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
-      }
+      connection.runLater(() -> {
+         sender.close();
+         try {
+            sessionSPI.closeSender(brokerConsumer);
+         } catch (Exception e) {
+            log.warn(e.getMessage(), e);
+         }
+         sender.close();
+         connection.flush();
+      });
    }
 
    /*
@@ -666,12 +723,8 @@ public void onError(int errorCode, String errorMessage) {
    }
 
    public void settle(Delivery delivery) {
-      connection.lock();
-      try {
-         delivery.settle();
-      } finally {
-         connection.unlock();
-      }
+      connection.requireInHandler();
+      delivery.settle();
    }
 
    public synchronized void checkState() {
@@ -681,42 +734,58 @@ public synchronized void checkState() {
    /**
     * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
     */
-   public int deliverMessage(MessageReference messageReference, int deliveryCount, Connection transportConnection) throws Exception {
+   public int deliverMessage(final MessageReference messageReference, final ServerConsumer consumer) throws Exception {
 
       if (closed) {
          return 0;
       }
 
-      AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
-      sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
+      try {
+         synchronized (creditsLock) {
+            if (sender.getLocalState() == EndpointState.CLOSED) {
+               return 0;
+            }
+            pending.incrementAndGet();
+            credits--;
+         }
 
-      // we only need a tag if we are going to settle later
-      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
+         if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
+            messageReference.onDelivery(executeDelivery);
+            connection.runNow((Runnable)messageReference);
+         } else {
+            connection.runNow(() -> executeDelivery(messageReference));
+         }
 
-      // Let the Message decide how to present the message bytes
-      ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
-      boolean releaseRequired = sendBuffer instanceof NettyReadable;
+         // This is because on AMQP we only send messages based in credits, not bytes
+         return 1;
+      } finally {
 
-      try {
-         int size = sendBuffer.remaining();
+      }
+   }
 
-         while (!connection.tryLock(1, TimeUnit.SECONDS)) {
-            if (closed || sender.getLocalState() == EndpointState.CLOSED) {
-               // If we're waiting on the connection lock, the link might be in the process of closing.  If this happens
-               // we return.
-               return 0;
-            } else {
-               if (log.isDebugEnabled()) {
-                  log.debug("Couldn't get lock on deliverMessage " + this);
-               }
-            }
+   private void executeDelivery(MessageReference messageReference) {
+
+      try {
+         if (sender.getLocalState() == EndpointState.CLOSED) {
+            log.debug("Not delivering message " + messageReference + " as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times");
+            return;
          }
+         AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
+
+         sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection());
+
+         // Let the Message decide how to present the message bytes
+         ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount());
+         // we only need a tag if we are going to settle later
+         byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
+
+         boolean releaseRequired = sendBuffer instanceof NettyReadable;
+         final Delivery delivery;
+         delivery = sender.delivery(tag, 0, tag.length);
+         delivery.setMessageFormat((int) message.getMessageFormat());
+         delivery.setContext(messageReference);
 
          try {
-            final Delivery delivery;
-            delivery = sender.delivery(tag, 0, tag.length);
-            delivery.setMessageFormat((int) message.getMessageFormat());
-            delivery.setContext(messageReference);
 
             if (releaseRequired) {
                sender.send(sendBuffer);
@@ -730,7 +799,11 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount,
 
             if (preSettle) {
                // Presettled means the client implicitly accepts any delivery we send it.
-               sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
+               try {
+                  sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
+               } catch (Exception e) {
+                  log.debug(e.getMessage(), e);
+               }
                delivery.settle();
             } else {
                sender.advance();
@@ -738,14 +811,16 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount,
 
             connection.flush();
          } finally {
-            connection.unlock();
-         }
-
-         return size;
-      } finally {
-         if (releaseRequired) {
-            ((NettyReadable) sendBuffer).getByteBuf().release();
+            synchronized (creditsLock) {
+               pending.decrementAndGet();
+            }
+            if (releaseRequired) {
+               ((NettyReadable) sendBuffer).getByteBuf().release();
+            }
          }
+      } catch (Exception e) {
+         log.warn(e.getMessage(), e);
+         brokerConsumer.errorProcessing(e, messageReference);
       }
    }
 
@@ -806,13 +881,8 @@ private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
     * Update link state to reflect that the previous drain attempt has completed.
     */
    public void reportDrained() {
-      connection.lock();
-      try {
-         sender.drained();
-      } finally {
-         connection.unlock();
-      }
-
+      connection.requireInHandler();
+      sender.drained();
       connection.flush();
    }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
new file mode 100644
index 0000000000..9d0f09eb8a
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.proton.handler;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty Executor
+ *  On that case this is a simple adapter for what's needed from these tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high cost.
+ *        We may do it some day if we find an easy way that won't clutter the code too much.
+ *  */
+public class ExecutorNettyAdapter implements EventLoop {
+
+   final ArtemisExecutor executor;
+
+   public ExecutorNettyAdapter(ArtemisExecutor executor) {
+      this.executor = executor;
+   }
+
+   @Override
+   public EventLoopGroup parent() {
+      return null;
+   }
+
+   @Override
+   public EventLoop next() {
+      return null;
+   }
+
+   @Override
+   public ChannelFuture register(Channel channel) {
+      return null;
+   }
+
+   @Override
+   public ChannelFuture register(ChannelPromise promise) {
+      return null;
+   }
+
+   @Override
+   public ChannelFuture register(Channel channel, ChannelPromise promise) {
+      return null;
+   }
+
+   @Override
+   public boolean inEventLoop() {
+      return inEventLoop(Thread.currentThread());
+   }
+
+   @Override
+   public boolean inEventLoop(Thread thread) {
+      return false;
+   }
+
+   @Override
+   public <V> Promise<V> newPromise() {
+      return null;
+   }
+
+   @Override
+   public <V> ProgressivePromise<V> newProgressivePromise() {
+      return null;
+   }
+
+   @Override
+   public <V> Future<V> newSucceededFuture(V result) {
+      return null;
+   }
+
+   @Override
+   public <V> Future<V> newFailedFuture(Throwable cause) {
+      return null;
+   }
+
+   @Override
+   public boolean isShuttingDown() {
+      return false;
+   }
+
+   @Override
+   public Future<?> shutdownGracefully() {
+      return null;
+   }
+
+   @Override
+   public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public Future<?> terminationFuture() {
+      return null;
+   }
+
+   @Override
+   public void shutdown() {
+
+   }
+
+   @Override
+   public List<Runnable> shutdownNow() {
+      return null;
+   }
+
+   @Override
+   public Iterator<EventExecutor> iterator() {
+      return null;
+   }
+
+   @Override
+   public Future<?> submit(Runnable task) {
+      execute(task);
+      return null;
+   }
+
+   @Override
+   public <T> Future<T> submit(Runnable task, T result) {
+      execute(task);
+      return null;
+   }
+
+   @Override
+   public <T> Future<T> submit(Callable<T> task) {
+      return null;
+   }
+
+   @Override
+   public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+      return null;
+   }
+
+   @Override
+   public boolean isShutdown() {
+      return false;
+   }
+
+   @Override
+   public boolean isTerminated() {
+      return false;
+   }
+
+   @Override
+   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      return false;
+   }
+
+   @Override
+   public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+      return null;
+   }
+
+   @Override
+   public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+                                                             long timeout,
+                                                             TimeUnit unit) throws InterruptedException {
+      return null;
+   }
+
+   @Override
+   public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+      return null;
+   }
+
+   @Override
+   public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+                          long timeout,
+                          TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return null;
+   }
+
+   @Override
+   public void execute(Runnable command) {
+      executor.execute(command);
+   }
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 694c1d36e7..2f730fb304 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -16,22 +16,24 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 
+import javax.security.auth.Subject;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.security.auth.Subject;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.EventLoop;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
@@ -46,9 +48,6 @@
 import org.apache.qpid.proton.engine.impl.TransportInternal;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 public class ProtonHandler extends ProtonInitializable implements SaslListener {
 
    private static final Logger log = Logger.getLogger(ProtonHandler.class);
@@ -68,8 +67,6 @@
    private ServerSASL chosenMechanism;
    private ClientSASL clientSASLMechanism;
 
-   private final ReentrantLock lock = new ReentrantLock();
-
    private final long creationTime;
 
    private final boolean isServer;
@@ -80,17 +77,20 @@
 
    protected boolean receivedFirstPacket = false;
 
-   private final Executor flushExecutor;
+   private final EventLoop workerExecutor;
+
+   private final ArtemisExecutor poolExecutor;
 
    protected final ReadyListener readyListener;
 
    boolean inDispatch = false;
 
-   public ProtonHandler(Executor flushExecutor, boolean isServer) {
-      this.flushExecutor = flushExecutor;
-      this.readyListener = () -> this.flushExecutor.execute(() -> {
-         flush();
-      });
+   boolean scheduledFlush = false;
+
+   public ProtonHandler(EventLoop workerExecutor, ArtemisExecutor poolExecutor, boolean isServer) {
+      this.workerExecutor = workerExecutor;
+      this.poolExecutor = poolExecutor;
+      this.readyListener = () -> runLater(this::flush);
       this.creationTime = System.currentTimeMillis();
       this.isServer = isServer;
 
@@ -106,45 +106,33 @@ public ProtonHandler(Executor flushExecutor, boolean isServer) {
    }
 
    public Long tick(boolean firstTick) {
-      if (firstTick) {
-         // the first tick needs to guarantee a lock here
-         lock.lock();
-      } else {
-         if (!lock.tryLock()) {
-            log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly");
-            // if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here
-            return null;
-         }
-      }
-      try {
-         if (!firstTick) {
-            try {
-               if (connection.getLocalState() != EndpointState.CLOSED) {
-                  long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-                  if (transport.isClosed()) {
-                     throw new IllegalStateException("Channel was inactive for to long");
-                  }
-                  return rescheduleAt;
+      requireHandler();
+      if (!firstTick) {
+         try {
+            if (connection.getLocalState() != EndpointState.CLOSED) {
+               long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+               if (transport.isClosed()) {
+                  throw new IllegalStateException("Channel was inactive for to long");
                }
-            } catch (Exception e) {
-               log.warn(e.getMessage(), e);
-               transport.close();
-               connection.setCondition(new ErrorCondition());
+               return rescheduleAt;
             }
-            return 0L;
+         } catch (Exception e) {
+            log.warn(e.getMessage(), e);
+            transport.close();
+            connection.setCondition(new ErrorCondition());
+         } finally {
+            flush();
          }
-         return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-      } finally {
-         lock.unlock();
-         flushBytes();
+         return 0L;
       }
+      return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
    }
 
    /**
     * We cannot flush until the initial handshake was finished.
     * If this happens before the handshake, the connection response will happen without SASL
     * and the client will respond and fail with an invalid code.
-    * */
+    */
    public void scheduledFlush() {
       if (receivedFirstPacket) {
          flush();
@@ -152,29 +140,17 @@ public void scheduledFlush() {
    }
 
    public int capacity() {
-      lock.lock();
-      try {
-         return transport.capacity();
-      } finally {
-         lock.unlock();
-      }
-   }
-
-   public void lock() {
-      lock.lock();
+      requireHandler();
+      return transport.capacity();
    }
 
-   public void unlock() {
-      lock.unlock();
-   }
-
-   public boolean tryLock(long time, TimeUnit timeUnit) {
-      try {
-         return lock.tryLock(time, timeUnit);
-      } catch (InterruptedException e) {
-
-         Thread.currentThread().interrupt();
-         return false;
+   public void requireHandler() {
+      if (!workerExecutor.inEventLoop()) {
+         new Exception("saco!!!").printStackTrace();
+         // this should not happen unless there is an obvious programming error
+         log.warn("Using inHandler is required", new Exception("trace"));
+         System.exit(-1);
+         throw new IllegalStateException("this method requires to be called within the handler, use the executor");
       }
    }
 
@@ -192,21 +168,34 @@ public ProtonHandler addEventHandler(EventHandler handler) {
    }
 
    public void createServerSASL(String[] mechanisms) {
+      requireHandler();
       Sasl sasl = transport.sasl();
       sasl.server();
       sasl.setMechanisms(mechanisms);
       sasl.setListener(this);
    }
 
+
+
    public void flushBytes() {
+      requireHandler();
+
+      if (!scheduledFlush) {
+         scheduledFlush = true;
+         workerExecutor.execute(this::actualFlush);
+      }
+   }
+
+   private void actualFlush() {
+      requireHandler();
 
       for (EventHandler handler : handlers) {
          if (!handler.flowControl(readyListener)) {
+            scheduledFlush = false;
             return;
          }
       }
 
-      lock.lock();
       try {
          while (true) {
             ByteBuffer head = transport.head();
@@ -227,7 +216,7 @@ public void flushBytes() {
             transport.pop(pending);
          }
       } finally {
-         lock.unlock();
+         scheduledFlush = false;
       }
    }
 
@@ -236,36 +225,32 @@ public SASLResult getSASLResult() {
    }
 
    public void inputBuffer(ByteBuf buffer) {
+      requireHandler();
       dataReceived = true;
-      lock.lock();
-      try {
-         while (buffer.readableBytes() > 0) {
-            int capacity = transport.capacity();
+      while (buffer.readableBytes() > 0) {
+         int capacity = transport.capacity();
 
-            if (!receivedFirstPacket) {
-               handleFirstPacket(buffer);
-               // there is a chance that if SASL Handshake has been carried out that the capacity may change.
-               capacity = transport.capacity();
-            }
+         if (!receivedFirstPacket) {
+            handleFirstPacket(buffer);
+            // there is a chance that if SASL Handshake has been carried out that the capacity may change.
+            capacity = transport.capacity();
+         }
 
-            if (capacity > 0) {
-               ByteBuffer tail = transport.tail();
-               int min = Math.min(capacity, buffer.readableBytes());
-               tail.limit(min);
-               buffer.readBytes(tail);
+         if (capacity > 0) {
+            ByteBuffer tail = transport.tail();
+            int min = Math.min(capacity, buffer.readableBytes());
+            tail.limit(min);
+            buffer.readBytes(tail);
 
-               flush();
+            flush();
+         } else {
+            if (capacity == 0) {
+               log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
             } else {
-               if (capacity == 0) {
-                  log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
-               } else {
-                  log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
-               }
-               break;
+               log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
             }
+            break;
          }
-      } finally {
-         lock.unlock();
       }
    }
 
@@ -281,29 +266,55 @@ public long getCreationTime() {
       return creationTime;
    }
 
+   public void runOnPool(Runnable runnable) {
+      poolExecutor.execute(runnable);
+   }
+
+   public void runNow(Runnable runnable) {
+      if (workerExecutor.inEventLoop()) {
+         runnable.run();
+      } else {
+         workerExecutor.execute(runnable);
+      }
+   }
+
+   public void runLater(Runnable runnable) {
+      workerExecutor.execute(runnable);
+   }
+
    public void flush() {
-      lock.lock();
-      try {
+      if (workerExecutor.inEventLoop()) {
          transport.process();
-      } finally {
-         lock.unlock();
+         dispatch();
+      } else {
+         runLater(() -> {
+            transport.process();
+            dispatch();
+         });
       }
-
-      dispatch();
    }
 
-   public void close(ErrorCondition errorCondition) {
-      lock.lock();
-      try {
+   public void close(ErrorCondition errorCondition, AMQPConnectionContext connectionContext) {
+      runNow(() -> {
          if (errorCondition != null) {
             connection.setCondition(errorCondition);
          }
          connection.close();
-      } finally {
-         lock.unlock();
-      }
+         flush();
+      });
 
-      flush();
+      /*try {
+         Thread.sleep(1000);
+      } catch (Exception e) {
+         e.printStackTrace();
+      } */
+      // this needs to be done in two steps
+      // we first flush what we have to the client
+      // after flushed, we close the local connection
+      // otherwise this could close the netty connection before the Writable is complete
+      runLater(() -> {
+         connectionContext.getConnectionCallback().getTransportConnection().close();
+      });
    }
 
    // server side SASL Listener
@@ -462,45 +473,59 @@ private void dispatchRemoteMechanismChosen(final String mech) {
    private void dispatch() {
       Event ev;
 
-      lock.lock();
+      if (inDispatch) {
+         // Avoid recursion from events
+         return;
+      }
       try {
-         if (inDispatch) {
-            // Avoid recursion from events
-            return;
-         }
-         try {
-            inDispatch = true;
-            while ((ev = collector.peek()) != null) {
-               for (EventHandler h : handlers) {
-                  if (log.isTraceEnabled()) {
-                     log.trace("Handling " + ev + " towards " + h);
-                  }
-                  try {
-                     Events.dispatch(ev, h);
-                  } catch (Exception e) {
-                     log.warn(e.getMessage(), e);
-                     ErrorCondition error = new ErrorCondition();
-                     error.setCondition(AmqpError.INTERNAL_ERROR);
-                     error.setDescription("Unrecoverable error: " +
-                        (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
-                     connection.setCondition(error);
-                     connection.close();
-                  }
+         inDispatch = true;
+         while ((ev = collector.peek()) != null) {
+            for (EventHandler h : handlers) {
+               if (log.isTraceEnabled()) {
+                  log.trace("Handling " + ev + " towards " + h);
+               }
+               try {
+                  Events.dispatch(ev, h);
+               } catch (Exception e) {
+                  log.warn(e.getMessage(), e);
+                  ErrorCondition error = new ErrorCondition();
+                  error.setCondition(AmqpError.INTERNAL_ERROR);
+                  error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
+                  connection.setCondition(error);
+                  connection.close();
                }
-
-               collector.pop();
             }
 
-         } finally {
-            inDispatch = false;
+            collector.pop();
          }
+
       } finally {
-         lock.unlock();
+         inDispatch = false;
       }
 
       flushBytes();
    }
 
+
+   public void handleError(Exception e) {
+      if (workerExecutor.inEventLoop()) {
+         internalHandlerError(e);
+      } else {
+         runLater(() -> internalHandlerError(e));
+      }
+   }
+
+   private void internalHandlerError(Exception e) {
+      log.warn(e.getMessage(), e);
+      ErrorCondition error = new ErrorCondition();
+      error.setCondition(AmqpError.INTERNAL_ERROR);
+      error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
+      connection.setCondition(error);
+      connection.close();
+      flush();
+   }
+
+
    public void open(String containerId, Map<Symbol, Object> connectionProperties) {
       this.transport.open();
       this.connection.setContainer(containerId);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 78a5b33637..15803f497c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -107,14 +107,11 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
             IOCallback ioAction = new IOCallback() {
                @Override
                public void done() {
-                  connection.lock();
-                  try {
+                  connection.runLater(() -> {
                      delivery.settle();
                      delivery.disposition(declared);
-                  } finally {
-                     connection.unlock();
                      connection.flush();
-                  }
+                  });
                }
 
                @Override
@@ -133,15 +130,12 @@ public void onError(int errorCode, String errorMessage) {
             IOCallback ioAction = new IOCallback() {
                @Override
                public void done() {
-                  connection.lock();
-                  try {
+                  connection.runLater(() -> {
                      delivery.settle();
                      delivery.disposition(new Accepted());
                      currentTx = null;
-                  } finally {
-                     connection.unlock();
                      connection.flush();
-                  }
+                  });
                }
 
                @Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index ab4ff42981..4c5a887cf5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -25,11 +25,13 @@
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.qpid.proton.engine.Delivery;
 
-
 /**
  * AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled
  * or not.  This class extends the Core TransactionImpl used for normal TX behaviour.  In the case where deliveries
@@ -46,8 +48,22 @@
 
    private boolean discharged;
 
-   public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
+   public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
       super(xid, storageManager, timeoutSeconds);
+      addOperation(new TransactionOperationAbstract() {
+         @Override
+         public void afterCommit(Transaction tx) {
+            super.afterCommit(tx);
+            connection.runNow(() -> {
+               // Settle all unsettled deliveries if commit is successful
+               for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
+                  if (!p.getA().isSettled())
+                     p.getB().settle(p.getA());
+               }
+               connection.flush();
+            });
+         }
+      });
    }
 
    @Override
@@ -71,11 +87,6 @@ public void addDelivery(Delivery delivery, ProtonServerSenderContext context) {
    @Override
    public void commit() throws Exception {
       super.commit();
-
-      // Settle all unsettled deliveries if commit is successful
-      for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
-         if (!p.getA().isSettled()) p.getB().settle(p.getA());
-      }
    }
 
    public boolean isDischarged() {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
index 30814a92c4..349c32dd03 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -16,13 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.never;
-
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -34,29 +27,65 @@
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
+
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
 
 public class AMQPSessionCallbackTest {
 
-   @Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+   @Rule
+   public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+   @Mock
+   private AMQPConnectionCallback protonSPI;
+   @Mock
+   private ProtonProtocolManager manager;
+   @Mock
+   private AMQPConnectionContext connection;
+   @Mock
+   private Connection transportConnection;
+   @Mock
+   private Executor executor;
+   @Mock
+   private OperationContext operationContext;
+   @Mock
+   private Receiver receiver;
+   @Mock
+   private ActiveMQServer server;
+   @Mock
+   private PagingManager pagingManager;
+   @Mock
+   private PagingStore pagingStore;
+
+
+   @Before
+   public void setRule() {
+
+      // The connection will call the runnable now on this mock, as these would happen on a different thread.
+      Mockito.doAnswer(new Answer() {
+         @Override
+         public Void answer(InvocationOnMock invocation) throws Throwable {
+            ((Runnable) invocation.getArguments()[0]).run();
+            return null;
+         }
+      }).when(connection).runNow(Mockito.isA(Runnable.class));
 
-   @Mock private AMQPConnectionCallback protonSPI;
-   @Mock private ProtonProtocolManager manager;
-   @Mock private AMQPConnectionContext connection;
-   @Mock private Connection transportConnection;
-   @Mock private Executor executor;
-   @Mock private OperationContext operationContext;
-   @Mock private Receiver receiver;
-   @Mock private ActiveMQServer server;
-   @Mock private PagingManager pagingManager;
-   @Mock private PagingStore pagingStore;
+   }
 
    /**
     * Test that the AMQPSessionCallback grants no credit when not at threshold
@@ -69,8 +98,7 @@ public void testOfferProducerWithNoAddressDoesNotTopOffCreditAboveThreshold() {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is above threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
@@ -100,8 +128,7 @@ public void testOfferProducerWithNoAddressTopsOffCreditAtThreshold() {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@@ -132,8 +159,7 @@ public void testOfferProducerWithAddressDoesNotTopOffCreditAboveThreshold() thro
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is above threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
@@ -164,8 +190,7 @@ public void testOfferProducerWithAddressTopsOffCreditAtThreshold() throws Except
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@@ -195,8 +220,7 @@ public void testOfferProducerWithNoAddressDoesNotGrantNegativeCredit() {
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@@ -227,8 +251,7 @@ public void testOfferProducerWithAddressDoesNotGrantNegativeCredit() throws Exce
 
       // Capture credit runnable and invoke to trigger credit top off
       ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
-      AMQPSessionCallback session = new AMQPSessionCallback(
-         protonSPI, manager, connection, transportConnection, executor, operationContext);
+      AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
 
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 2402d09b15..893e3a746e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -18,6 +18,7 @@
 
 import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -31,7 +32,7 @@
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 import org.jboss.logging.Logger;
 
-public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference {
+public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference, Runnable {
 
    private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class);
 
@@ -74,6 +75,8 @@
 
    private long messageSize = -1;
 
+   private Consumer<? super MessageReference> onDelivery;
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -89,6 +92,27 @@ public Message getMessage() {
       return getPagedMessage().getMessage();
    }
 
+   @Override
+   public void onDelivery(Consumer<? super MessageReference> onDelivery) {
+      assert this.onDelivery == null;
+      this.onDelivery = onDelivery;
+   }
+
+   /**
+    * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
+    */
+   @Override
+   public void run() {
+      final Consumer<? super MessageReference> onDelivery = this.onDelivery;
+      if (onDelivery != null) {
+         try {
+            onDelivery.accept(this);
+         } finally {
+            this.onDelivery = null;
+         }
+      }
+   }
+
    @Override
    public synchronized PagedMessage getPagedMessage() {
       PagedMessage returnMessage = message != null ? message.get() : null;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
index f1e83d2e2d..bd6b705e7c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java
@@ -26,6 +26,10 @@
 
 public interface Binding extends UnproposalListener {
 
+   default boolean isLocal() {
+      return false;
+   }
+
    SimpleString getAddress();
 
    Bindable getBindable();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
index 30a268056d..053acfae7e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
@@ -26,6 +26,9 @@
 
 public interface Bindings extends UnproposalListener {
 
+   // this is to inform the parent there was an udpate on the bindings
+   void updated(QueueBinding binding);
+
    Collection<Binding> getBindings();
 
    void addBinding(Binding binding);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 56abddbc09..9cd6f73870 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -26,12 +26,14 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -63,6 +65,13 @@
 
    private final SimpleString name;
 
+   private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE);
+
+   /**
+    * This has a version about adds and removes
+    */
+   private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet());
+
    public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
       this.groupingHandler = groupingHandler;
       this.name = name;
@@ -92,61 +101,78 @@ public void unproposed(SimpleString groupID) {
 
    @Override
    public void addBinding(final Binding binding) {
-      if (logger.isTraceEnabled()) {
-         logger.trace("addBinding(" + binding + ") being called");
-      }
-      if (binding.isExclusive()) {
-         exclusiveBindings.add(binding);
-      } else {
-         SimpleString routingName = binding.getRoutingName();
+      try {
+         if (logger.isTraceEnabled()) {
+            logger.trace("addBinding(" + binding + ") being called");
+         }
+         if (binding.isExclusive()) {
+            exclusiveBindings.add(binding);
+         } else {
+            SimpleString routingName = binding.getRoutingName();
 
-         List<Binding> bindings = routingNameBindingMap.get(routingName);
+            List<Binding> bindings = routingNameBindingMap.get(routingName);
 
-         if (bindings == null) {
-            bindings = new CopyOnWriteArrayList<>();
+            if (bindings == null) {
+               bindings = new CopyOnWriteArrayList<>();
 
-            List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
+               List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
+
+               if (oldBindings != null) {
+                  bindings = oldBindings;
+               }
+            }
 
-            if (oldBindings != null) {
-               bindings = oldBindings;
+            if (!bindings.contains(binding)) {
+               bindings.add(binding);
             }
          }
 
-         if (!bindings.contains(binding)) {
-            bindings.add(binding);
+         bindingsMap.put(binding.getID(), binding);
+
+         if (logger.isTraceEnabled()) {
+            logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
          }
+      } finally {
+         updated();
       }
 
-      bindingsMap.put(binding.getID(), binding);
+   }
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
-      }
+   @Override
+   public void updated(QueueBinding binding) {
+      updated();
+   }
 
+   private void updated() {
+      version.set(sequenceVersion.incrementAndGet());
    }
 
    @Override
    public void removeBinding(final Binding binding) {
-      if (binding.isExclusive()) {
-         exclusiveBindings.remove(binding);
-      } else {
-         SimpleString routingName = binding.getRoutingName();
+      try {
+         if (binding.isExclusive()) {
+            exclusiveBindings.remove(binding);
+         } else {
+            SimpleString routingName = binding.getRoutingName();
 
-         List<Binding> bindings = routingNameBindingMap.get(routingName);
+            List<Binding> bindings = routingNameBindingMap.get(routingName);
 
-         if (bindings != null) {
-            bindings.remove(binding);
+            if (bindings != null) {
+               bindings.remove(binding);
 
-            if (bindings.isEmpty()) {
-               routingNameBindingMap.remove(routingName);
+               if (bindings.isEmpty()) {
+                  routingNameBindingMap.remove(routingName);
+               }
             }
          }
-      }
 
-      bindingsMap.remove(binding.getID());
+         bindingsMap.remove(binding.getID());
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
+         if (logger.isTraceEnabled()) {
+            logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
+         }
+      } finally {
+         updated();
       }
    }
 
@@ -267,11 +293,9 @@ private void route(final Message message,
 
          if (binding.getFilter() == null || binding.getFilter().match(message)) {
             binding.getBindable().route(message, context);
-
             routed = true;
          }
       }
-
       if (!routed) {
          // Remove the ids now, in order to avoid double check
          ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
@@ -280,30 +304,53 @@ private void route(final Message message,
          SimpleString groupId = message.getGroupID();
 
          if (ids != null) {
+            context.clear();
             routeFromCluster(message, context, ids);
          } else if (groupingHandler != null && groupRouting && groupId != null) {
+            context.clear();
             routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
          } else {
-            if (logger.isTraceEnabled()) {
-               logger.trace("Routing message " + message + " on binding=" + this);
+            // in a optimization, we are reusing the previous context if everything is right for it
+            // so the simpleRouting will only happen if neededk
+            if (!context.isReusable(message, version.get())) {
+               context.clear();
+               simpleRouting(message, context);
             }
-            for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
-               SimpleString routingName = entry.getKey();
+         }
+      }
+   }
 
-               List<Binding> bindings = entry.getValue();
+   private void simpleRouting(Message message, RoutingContext context) throws Exception {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Routing message " + message + " on binding=" + this);
+      }
 
-               if (bindings == null) {
-                  // The value can become null if it's concurrently removed while we're iterating - this is expected
-                  // ConcurrentHashMap behaviour!
-                  continue;
-               }
+      // We check at the version before we started routing,
+      // this is because if something changed in between we want to check the correct version
+      int currentVersion = version.get();
 
-               Binding theBinding = getNextBinding(message, routingName, bindings);
+      for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
+         SimpleString routingName = entry.getKey();
 
-               if (theBinding != null) {
-                  theBinding.route(message, context);
-               }
-            }
+         List<Binding> bindings = entry.getValue();
+
+         if (bindings == null) {
+            // The value can become null if it's concurrently removed while we're iterating - this is expected
+            // ConcurrentHashMap behaviour!
+            continue;
+         }
+
+         Binding theBinding = getNextBinding(message, routingName, bindings);
+
+         if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
+            context.setReusable(true, currentVersion);
+         } else {
+            // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
+            context.setReusable(false, currentVersion);
+         }
+
+         if (theBinding != null) {
+            theBinding.route(message, context);
          }
       }
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 79af5d075d..79ab4d3480 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -46,6 +46,11 @@ public LocalQueueBinding(final SimpleString address, final Queue queue, final Si
       clusterName = queue.getName().concat(nodeID);
    }
 
+   @Override
+   public boolean isLocal() {
+      return true;
+   }
+
    @Override
    public long getID() {
       return queue.getID();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 5346d6c736..bf12baf32f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -485,82 +485,91 @@ public QueueBinding updateQueue(SimpleString name,
             return null;
          }
 
-         final Queue queue = queueBinding.getQueue();
+         Bindings bindingsOnQueue = addressManager.getBindingsForRoutingAddress(queueBinding.getAddress());
 
-         boolean changed = false;
+         try {
 
-         //validate update
-         if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
-            final int consumerCount = queue.getConsumerCount();
-            if (consumerCount > maxConsumers) {
-               throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
+            final Queue queue = queueBinding.getQueue();
+
+            boolean changed = false;
+
+            //validate update
+            if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
+               final int consumerCount = queue.getConsumerCount();
+               if (consumerCount > maxConsumers) {
+                  throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
+               }
             }
-         }
-         if (routingType != null) {
-            final SimpleString address = queue.getAddress();
-            final AddressInfo addressInfo = addressManager.getAddressInfo(address);
-            final EnumSet<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
-            if (!addressRoutingTypes.contains(routingType)) {
-               throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
+            if (routingType != null) {
+               final SimpleString address = queue.getAddress();
+               final AddressInfo addressInfo = addressManager.getAddressInfo(address);
+               final EnumSet<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
+               if (!addressRoutingTypes.contains(routingType)) {
+                  throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
+               }
             }
-         }
 
-         //atomic update
-         if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
-            changed = true;
-            queue.setMaxConsumer(maxConsumers);
-         }
-         if (routingType != null && queue.getRoutingType() != routingType) {
-            changed = true;
-            queue.setRoutingType(routingType);
-         }
-         if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
-            changed = true;
-            queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
-         }
-         if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) {
-            changed = true;
-            queue.setExclusive(exclusive);
-         }
-         if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
-            changed = true;
-            queue.setNonDestructive(nonDestructive);
-         }
-         if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
-            changed = true;
-            queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
-         }
-         if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) {
-            changed = true;
-            queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
-         }
-         if (filter != null && !filter.equals(queue.getFilter())) {
-            changed = true;
-            queue.setFilter(filter);
-         }
-         if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
-            changed = true;
-            queue.setConfigurationManaged(configurationManaged);
-         }
-         if (logger.isDebugEnabled()) {
-            if (user == null && queue.getUser() != null) {
-               logger.debug("Ignoring updating Queue to a NULL user");
+            //atomic update
+            if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
+               changed = true;
+               queue.setMaxConsumer(maxConsumers);
+            }
+            if (routingType != null && queue.getRoutingType() != routingType) {
+               changed = true;
+               queue.setRoutingType(routingType);
+            }
+            if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
+               changed = true;
+               queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
+            }
+            if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) {
+               changed = true;
+               queue.setExclusive(exclusive);
+            }
+            if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
+               changed = true;
+               queue.setNonDestructive(nonDestructive);
+            }
+            if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
+               changed = true;
+               queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
+            }
+            if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) {
+               changed = true;
+               queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
+            }
+            if (filter != null && !filter.equals(queue.getFilter())) {
+               changed = true;
+               queue.setFilter(filter);
+            }
+            if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
+               changed = true;
+               queue.setConfigurationManaged(configurationManaged);
+            }
+            if (logger.isDebugEnabled()) {
+               if (user == null && queue.getUser() != null) {
+                  logger.debug("Ignoring updating Queue to a NULL user");
+               }
+            }
+            if (user != null && !user.equals(queue.getUser())) {
+               changed = true;
+               queue.setUser(user);
             }
-         }
-         if (user != null && !user.equals(queue.getUser())) {
-            changed = true;
-            queue.setUser(user);
-         }
 
-         if (changed) {
-            final long txID = storageManager.generateID();
-            try {
-               storageManager.updateQueueBinding(txID, queueBinding);
-               storageManager.commitBindings(txID);
-            } catch (Throwable throwable) {
-               storageManager.rollback(txID);
-               logger.warn(throwable.getMessage(), throwable);
-               throw throwable;
+            if (changed) {
+               final long txID = storageManager.generateID();
+               try {
+                  storageManager.updateQueueBinding(txID, queueBinding);
+                  storageManager.commitBindings(txID);
+               } catch (Throwable throwable) {
+                  storageManager.rollback(txID);
+                  logger.warn(throwable.getMessage(), throwable);
+                  throw throwable;
+               }
+            }
+         } finally {
+            if (bindingsOnQueue != null) {
+               bindingsOnQueue.updated(queueBinding);
             }
          }
 
@@ -876,6 +885,7 @@ public RoutingStatus route(final Message message,
       AddressInfo addressInfo = addressManager.getAddressInfo(address);
 
       if (bindingMove != null) {
+         context.clear();
          bindingMove.route(message, context);
          if (addressInfo != null) {
             addressInfo.incrementRoutedMessageCount();
@@ -1341,7 +1351,7 @@ public void onError(final int errorCode, final String errorMessage) {
 
             @Override
             public void done() {
-               addReferences(refs, direct);
+               context.processReferences(refs, direct);
             }
          });
       }
@@ -1476,16 +1486,7 @@ private boolean checkDuplicateID(final Message message,
       return true;
    }
 
-   /**
-    * @param refs
-    */
-   private void addReferences(final List<MessageReference> refs, final boolean direct) {
-      for (MessageReference ref : refs) {
-         ref.getQueue().addTail(ref, direct);
-      }
-   }
-
-   /**
+  /**
     * The expiry scanner can't be started until the whole server has been started other wise you may get races
     */
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 09d67ad075..87a3c30340 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -263,7 +263,7 @@ public Acceptor createAcceptor(TransportConfiguration info) {
 
          Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>();
          for (Entry<String, ProtocolManagerFactory> entry : selectedProtocolFactories.entrySet()) {
-            selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
+            selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getCombinedParams(), incomingInterceptors, outgoingInterceptors));
          }
 
          acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 43bed889ac..4b5509e087 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1048,7 +1048,7 @@ void slowConsumerDetected(String sessionID,
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222151, value = "removing consumer which did not handle a message, consumer={0}, message={1}",
       format = Message.Format.MESSAGE_FORMAT)
-   void removingBadConsumer(@Cause Throwable e, Consumer consumer, MessageReference reference);
+   void removingBadConsumer(@Cause Throwable e, Consumer consumer, Object reference);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222152, value = "Unable to decrement reference counting on queue",
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 6df48890da..1dfff29e1b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -46,6 +46,10 @@ default boolean supportsDirectDelivery() {
     */
    HandleStatus handle(MessageReference reference) throws Exception;
 
+   /** wakes up internal threads to deliver more messages */
+   default void promptDelivery() {
+   }
+
    /**
     * This will proceed with the actual delivery.
     * Notice that handle should hold a readLock and proceedDelivery should release the readLock
@@ -80,4 +84,8 @@ default boolean supportsDirectDelivery() {
 
    /** an unique sequential ID for this consumer */
    long sequentialID();
+
+   default void errorProcessing(Throwable e, MessageReference reference) {
+
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 2e2fb8d85a..905f93d7c1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.core.server;
 
 
+import java.util.function.Consumer;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -44,6 +46,15 @@ public static MessageReference createReference(Message encode, final Queue queue
 
    SimpleString getLastValueProperty();
 
+   /**
+    * This is to be used in cases where a message delivery happens on an executor.
+    * Most MessageReference implementations will allow execution, and if it does,
+    * and the protocol requires an execution per message, this callback may be used.
+    *
+    * At the time of this implementation only AMQP was used.
+    */
+   void onDelivery(Consumer<? super MessageReference> callback);
+
    /**
     * We define this method aggregation here because on paging we need to hold the original estimate,
     * so we need to perform some extra steps on paging.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 8a120ea012..031d01a58a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -53,6 +53,10 @@
 
    void setRoutingType(RoutingType routingType);
 
+   /** the current queue and consumer settings will allow use of the Reference Execution and callback.
+    *  This is because  */
+   boolean allowsReferenceCallback();
+
    boolean isDurable();
 
    /**
@@ -392,4 +396,8 @@ int moveReferences(int flushLimit,
    /** This is to perform a check on the counter again */
    void recheckRefCount(OperationContext context);
 
+   default void errorProcessing(Consumer consumer, Throwable t, MessageReference messageReference) {
+
+   }
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index 9b09256103..151aa41d94 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -26,6 +26,24 @@
 
 public interface RoutingContext {
 
+   /*
+     This will return true if the RoutingContext can be reused
+     false if it cannot
+     null, if we don't know.
+
+
+     Once false, it can't be set to true
+   */
+   boolean isReusable();
+
+   int getPreviousBindingsVersion();
+
+   SimpleString getPreviousAddress();
+
+   void setReusable(boolean reusable);
+
+   RoutingContext setReusable(boolean reusable, int version);
+
    Transaction getTransaction();
 
    void setTransaction(Transaction transaction);
@@ -54,5 +72,16 @@
 
    SimpleString getAddress(Message message);
 
+   SimpleString getAddress();
+
    RoutingType getRoutingType();
+
+   RoutingType getPreviousRoutingType();
+
+   void processReferences(List<MessageReference> refs, boolean direct);
+
+   boolean isReusable(Message message, int version);
+
+
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index eb055631e4..4d3591954b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -31,6 +31,10 @@
 
    void fireSlowConsumer();
 
+   /** the current queue settings will allow use of the Reference Execution and callback.
+    *  This is because  */
+   boolean allowReferenceCallback();
+
    /**
     * this is to be used with anything specific on a protocol head.
     */
@@ -105,6 +109,4 @@
    long getCreationTime();
 
    String getSessionID();
-
-   void promptDelivery();
 }
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 37442b2c42..cfc3e014d2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.core.Message;
@@ -43,6 +44,8 @@
 
    Object getConnectionID();
 
+   Executor getSessionExecutor();
+
    /**
     * Certain protocols may create an internal session that shouldn't go through security checks.
     * make sure you don't expose this property through any protocol layer as that would be a security breach
@@ -241,12 +244,26 @@ RoutingStatus send(Transaction tx,
                       boolean direct,
                       boolean noAutoCreateQueue) throws Exception;
 
+   RoutingStatus send(Transaction tx,
+                      Message message,
+                      boolean direct,
+                      boolean noAutoCreateQueue,
+                      RoutingContext routingContext) throws Exception;
+
+
    RoutingStatus doSend(Transaction tx,
                         Message msg,
                         SimpleString originalAddress,
                         boolean direct,
                         boolean noAutoCreateQueue) throws Exception;
 
+   RoutingStatus doSend(Transaction tx,
+                        Message msg,
+                        SimpleString originalAddress,
+                        boolean direct,
+                        boolean noAutoCreateQueue,
+                        RoutingContext routingContext) throws Exception;
+
    RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
 
    RoutingStatus send(Message message, boolean direct) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 315b926684..0ebd7a80d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -21,6 +21,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -49,6 +50,7 @@
  * This is useful for example, for stock prices, where you're only interested in the latest value
  * for a particular stock
  */
+@SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
    private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
@@ -146,6 +148,11 @@ public synchronized void addHead(final MessageReference ref, boolean scheduling)
       }
    }
 
+   @Override
+   public boolean allowsReferenceCallback() {
+      return false;
+   }
+
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
@@ -231,6 +238,11 @@ public SimpleString getLastValueKey() {
          this.ref = ref;
       }
 
+      @Override
+      public void onDelivery(Consumer<? super MessageReference> callback) {
+         // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
+      }
+
       MessageReference getReference() {
          return ref;
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 2401c4a405..12acffd579 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -30,7 +31,7 @@
 /**
  * Implementation of a MessageReference
  */
-public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
+public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
 
    private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
       .newUpdater(MessageReferenceImpl.class, "deliveryCount");
@@ -54,6 +55,8 @@
 
    private Object protocolData;
 
+   private Consumer<? super MessageReference> onDelivery;
+
    // Static --------------------------------------------------------
 
    private static final int memoryOffset = 64;
@@ -84,6 +87,27 @@ public MessageReferenceImpl(final Message message, final Queue queue) {
 
    // MessageReference implementation -------------------------------
 
+   @Override
+   public void onDelivery(Consumer<? super MessageReference> onDelivery) {
+      assert this.onDelivery == null;
+      this.onDelivery = onDelivery;
+   }
+
+   /**
+    * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
+    */
+   @Override
+   public void run() {
+      final Consumer<? super MessageReference> onDelivery = this.onDelivery;
+      if (onDelivery != null) {
+         try {
+            onDelivery.accept(this);
+         } finally {
+            this.onDelivery = null;
+         }
+      }
+   }
+
    @Override
    public Object getProtocolData() {
       return protocolData;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 292cfc1e52..1752eb28c4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -535,6 +535,13 @@ public QueueImpl(final long id,
 
    // Bindable implementation -------------------------------------------------------------------------------------
 
+   @Override
+   public boolean allowsReferenceCallback() {
+      // non descructive queues will reuse the same reference between multiple consumers
+      // so you cannot really use the callback from the MessageReference
+      return !nonDestructive;
+   }
+
    public SimpleString getRoutingName() {
       return name;
    }
@@ -627,8 +634,11 @@ public synchronized void setNonDestructive(boolean nonDestructive) {
 
    @Override
    public void route(final Message message, final RoutingContext context) throws Exception {
-      if (purgeOnNoConsumers && getConsumerCount() == 0) {
-         return;
+      if (purgeOnNoConsumers) {
+         context.setReusable(false);
+         if (getConsumerCount() == 0) {
+            return;
+         }
       }
       context.addQueue(address, this);
    }
@@ -849,11 +859,11 @@ public void addTail(final MessageReference ref, final boolean direct) {
                   // Go into direct delivery mode
                   directDeliver = supportsDirectDeliver;
                   if (logger.isTraceEnabled()) {
-                     logger.trace("Setting direct deliverer to " + supportsDirectDeliver);
+                     logger.trace("Setting direct deliverer to " + supportsDirectDeliver + " on queue " + this.getName());
                   }
                } else {
                   if (logger.isTraceEnabled()) {
-                     logger.trace("Couldn't set direct deliver back");
+                     logger.trace("Couldn't set direct deliver back on queue " + this.getName());
                   }
                }
             }
@@ -1414,6 +1424,7 @@ public void acknowledge(final MessageReference ref, final ServerConsumer consume
    @Override
    public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
       if (nonDestructive && reason == AckReason.NORMAL) {
+         decDelivering(ref);
          if (logger.isDebugEnabled()) {
             logger.debug("acknowledge ignored nonDestructive=true and reason=NORMAL");
          }
@@ -3141,6 +3152,10 @@ private boolean deliverDirect(final MessageReference ref) {
                break;
             }
          }
+
+         if (logger.isTraceEnabled()) {
+            logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
+         }
          return false;
       }
    }
@@ -3160,24 +3175,29 @@ private void proceedDeliver(Consumer consumer, MessageReference reference) {
       try {
          consumer.proceedDeliver(reference);
       } catch (Throwable t) {
-         ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
-
-         synchronized (this) {
-            // If the consumer throws an exception we remove the consumer
-            try {
-               removeConsumer(consumer);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
-            }
-
-            // The message failed to be delivered, hence we try again
-            addHead(reference, false);
-         }
+         errorProcessing(consumer, t, reference);
       } finally {
          deliveriesInTransit.countDown();
       }
    }
 
+   /** This will print errors and decide what to do with the errored consumer from the protocol layer. */
+   @Override
+   public void errorProcessing(Consumer consumer, Throwable t, MessageReference reference) {
+      synchronized (this) {
+         ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
+         // If the consumer throws an exception we remove the consumer
+         try {
+            removeConsumer(consumer);
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
+         }
+
+         // The message failed to be delivered, hence we try again
+         addHead(reference, false);
+      }
+   }
+
    private boolean checkExpired(final MessageReference reference) {
       try {
          if (reference.getMessage().isExpired()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 29a70e44bc..b5b36bb2b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -20,9 +20,11 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -40,19 +42,68 @@
 
    private SimpleString address;
 
+   private SimpleString previousAddress;
+
+   private RoutingType previousRoutingType;
+
    private RoutingType routingType;
 
+   Boolean reusable = null;
+
+   volatile int version;
+
+   private final Executor executor;
+
    public RoutingContextImpl(final Transaction transaction) {
+      this(transaction, null);
+   }
+
+   public RoutingContextImpl(final Transaction transaction, Executor executor) {
       this.transaction = transaction;
+      this.executor = executor;
    }
 
    @Override
-   public void clear() {
-      transaction = null;
+   public boolean isReusable() {
+      return reusable != null && reusable;
+   }
+
+   @Override
+   public int getPreviousBindingsVersion() {
+      return version;
+   }
+
+   @Override
+   public SimpleString getPreviousAddress() {
+      return previousAddress;
+   }
+
+   @Override
+   public void setReusable(boolean reusable) {
+      this.reusable = reusable;
+   }
+   @Override
+   public RoutingContext setReusable(boolean reusable, int previousBindings) {
+      this.version = previousBindings;
+      this.previousAddress = address;
+      this.previousRoutingType = routingType;
+      if (this.reusable != null && !this.reusable.booleanValue()) {
+         // cannot set to Reusable once it was set to false
+         return this;
+      }
+      this.reusable = reusable;
+      return this;
+   }
 
+   @Override
+   public void clear() {
       map.clear();
 
       queueCount = 0;
+
+      this.version = 0;
+
+      this.reusable = null;
    }
 
    @Override
@@ -69,6 +120,18 @@ public void addQueue(final SimpleString address, final Queue queue) {
       queueCount++;
    }
 
+   @Override
+   public void processReferences(final List<MessageReference> refs, final boolean direct) {
+      internalprocessReferences(refs, direct);
+   }
+
+   private void internalprocessReferences(final List<MessageReference> refs, final boolean direct) {
+      for (MessageReference ref : refs) {
+         ref.getQueue().addTail(ref, direct);
+      }
+   }
+
+
    @Override
    public void addQueueWithAck(SimpleString address, Queue queue) {
       addQueue(address, queue);
@@ -82,6 +145,11 @@ public boolean isAlreadyAcked(SimpleString address, Queue queue) {
       return listing == null ? false : listing.isAlreadyAcked(queue);
    }
 
+   @Override
+   public boolean isReusable(Message message, int version) {
+      return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType && getPreviousBindingsVersion() == version;
+   }
+
    @Override
    public void setAddress(SimpleString address) {
       this.address = address;
@@ -100,11 +168,21 @@ public SimpleString getAddress(Message message) {
       return address;
    }
 
+   @Override
+   public SimpleString getAddress() {
+      return address;
+   }
+
    @Override
    public RoutingType getRoutingType() {
       return routingType;
    }
 
+   @Override
+   public RoutingType getPreviousRoutingType() {
+      return previousRoutingType;
+   }
+
    @Override
    public RouteContextList getContextListing(SimpleString address) {
       RouteContextList listing = map.get(address);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index f7a89d7a66..19e395646b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -256,6 +256,15 @@ public void readyForWriting() {
    // ----------------------------------------------------------------------
 
 
+   @Override
+   public boolean allowReferenceCallback() {
+      if (browseOnly) {
+         return false;
+      } else {
+         return messageQueue.allowsReferenceCallback();
+      }
+   }
+
    @Override
    public long sequentialID() {
       return sequentialID;
@@ -346,6 +355,10 @@ public boolean supportsDirectDelivery() {
       return callback.supportsDirectDelivery();
    }
 
+   @Override
+   public void errorProcessing(Throwable e, MessageReference deliveryObject) {
+      messageQueue.errorProcessing(this, e, deliveryObject);
+   }
 
    @Override
    public HandleStatus handle(final MessageReference ref) throws Exception {
@@ -582,13 +595,14 @@ public void removeItself() throws Exception {
    public void forceDelivery(final long sequence)  {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
+         MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
+         reference.setDeliveryCount(0);
 
          forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
          forcedDeliveryMessage.setAddress(messageQueue.getName());
 
          applyPrefixForLegacyConsumer(forcedDeliveryMessage);
-         callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
-
+         callback.sendMessage(reference, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
       });
    }
 
@@ -949,7 +963,7 @@ public synchronized void individualAcknowledge(Transaction tx, final long messag
       } catch (ActiveMQException e) {
          if (startedTransaction) {
             tx.rollback();
-         } else {
+         } else if (tx != null) {
             tx.markAsRollbackOnly(e);
          }
          throw e;
@@ -958,7 +972,7 @@ public synchronized void individualAcknowledge(Transaction tx, final long messag
          ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
          if (startedTransaction) {
             tx.rollback();
-         } else {
+         } else if (tx != null) {
             tx.markAsRollbackOnly(hqex);
          }
          throw hqex;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 3bc60f2084..11b096baf6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -31,6 +31,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.Closeable;
@@ -190,6 +191,8 @@
 
    private Set<Closeable> closeables;
 
+   private final Executor sessionExecutor;
+
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -264,6 +267,8 @@ public ServerSessionImpl(final String name,
       remotingConnection.addFailureListener(this);
       this.context = context;
 
+      this.sessionExecutor = server.getExecutorFactory().getExecutor();
+
       if (!xa) {
          tx = newTransaction();
       }
@@ -283,6 +288,11 @@ public void addCloseable(Closeable closeable) {
       this.closeables.add(closeable);
    }
 
+   @Override
+   public Executor getSessionExecutor() {
+      return sessionExecutor;
+   }
+
    @Override
    public void disableSecurity() {
       this.securityEnabled = false;
@@ -1467,12 +1477,20 @@ private LargeServerMessage messageToLargeMessage(Message message) throws Excepti
       return lsm;
    }
 
-
    @Override
    public synchronized RoutingStatus send(Transaction tx,
                                           Message msg,
                                           final boolean direct,
                                           boolean noAutoCreateQueue) throws Exception {
+      return send(tx, msg, direct, noAutoCreateQueue, routingContext);
+   }
+
+   @Override
+   public synchronized RoutingStatus send(Transaction tx,
+                                          Message msg,
+                                          final boolean direct,
+                                          boolean noAutoCreateQueue,
+                                          RoutingContext routingContext) throws Exception {
 
       final Message message;
       if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) {
@@ -1527,7 +1545,7 @@ public synchronized RoutingStatus send(Transaction tx,
 
             result = handleManagementMessage(tx, message, direct);
          } else {
-            result = doSend(tx, message, address, direct, noAutoCreateQueue);
+            result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext);
          }
 
       } catch (Exception e) {
@@ -1766,7 +1784,7 @@ private RoutingStatus handleManagementMessage(final Transaction tx,
          }
          reply.setAddress(replyTo);
 
-         doSend(tx, reply, null, direct, false);
+         doSend(tx, reply, null, direct, false, routingContext);
       }
 
       return RoutingStatus.OK;
@@ -1823,12 +1841,24 @@ public void afterRollback(Transaction tx) {
       theTx.rollback();
    }
 
+
    @Override
    public synchronized RoutingStatus doSend(final Transaction tx,
                                             final Message msg,
                                             final SimpleString originalAddress,
                                             final boolean direct,
                                             final boolean noAutoCreateQueue) throws Exception {
+      return doSend(tx, msg, originalAddress, direct, noAutoCreateQueue, routingContext);
+   }
+
+
+   @Override
+   public synchronized RoutingStatus doSend(final Transaction tx,
+                                            final Message msg,
+                                            final SimpleString originalAddress,
+                                            final boolean direct,
+                                            final boolean noAutoCreateQueue,
+                                            final RoutingContext routingContext) throws Exception {
 
       RoutingStatus result = RoutingStatus.OK;
 
@@ -1861,6 +1891,7 @@ public synchronized RoutingStatus doSend(final Transaction tx,
       }
 
       if (tx == null || autoCommitSends) {
+         routingContext.setTransaction(null);
       } else {
          routingContext.setTransaction(tx);
       }
@@ -1880,7 +1911,9 @@ public synchronized RoutingStatus doSend(final Transaction tx,
             value.getB().incrementAndGet();
          }
       } finally {
-         routingContext.clear();
+         if (!routingContext.isReusable()) {
+            routingContext.clear();
+         }
       }
       return result;
    }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 0ef7804290..2e3b691dca 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -793,6 +793,11 @@ public void setPurgeOnNoConsumers(boolean value) {
 
       }
 
+      @Override
+      public boolean allowsReferenceCallback() {
+         return false;
+      }
+
       @Override
       public int getConsumersBeforeDispatch() {
          return 0;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 05e763f2b9..550adbe6ed 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -250,7 +250,7 @@ public void testPurgeOnNoConsumersTrue() throws Exception {
 
       // there are no consumers so no messages should be routed to the queue
       producer.send(session.createMessage(true));
-      assertEquals(0, queue.getMessageCount());
+      Wait.assertEquals(0, queue::getMessageCount);
    }
 
    @Test
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index 4529efbd4d..cfcbc209cb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -138,7 +138,7 @@ public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception
       sender.send(message);
       sender.close();
 
-      assertEquals(1, queueView.getMessageCount());
+      Wait.assertEquals(1, queueView::getMessageCount);
 
       // Now try and get the message
       AmqpReceiver receiver = session.createReceiver(getQueueName());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
index 2f65dfb20d..c6119a188b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
@@ -19,6 +19,7 @@
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -74,7 +75,8 @@ public void testMesagesNotSent() throws Exception {
          }
          receiver.close();
          session2.close();
-         assertEquals(1000, sender.getSender().getCredit());
+
+         Wait.assertEquals(1000, sender.getSender()::getCredit);
          for (int i = 0; i < 1000; i++) {
             final AmqpMessage message = new AmqpMessage();
             byte[] payload = new byte[100];
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index ea62df8daf..9dc1138798 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -47,6 +47,7 @@
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Sender;
 import org.jgroups.util.UUID;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1154,4 +1155,52 @@ public void run() {
       receiver.close();
       connection.close();
    }
+
+
+
+   @Test(timeout = 60000)
+   public void testReceiveRejecting() throws Exception {
+      final int MSG_COUNT = 1000;
+
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      final String address = getQueueName();
+
+
+      AmqpSender sender = session.createSender(address);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setMessageId("msg" + i);
+         sender.send(message);
+      }
+
+
+
+      Queue queueView = getProxyToQueue(address);
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         final AmqpReceiver receiver = session.createReceiver(address);
+
+         receiver.flow(MSG_COUNT);
+         AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(received);
+         Assert.assertEquals("msg" + i, received.getMessageId());
+         received.accept();
+         receiver.close();
+      }
+      final AmqpReceiver receiver = session.createReceiver(address);
+      receiver.flow(MSG_COUNT);
+
+      Assert.assertNull(receiver.receive(1, TimeUnit.MILLISECONDS));
+
+
+      Wait.assertEquals(0, queueView::getDeliveringCount);
+
+      connection.close();
+   }
+
+
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index 96d5f1c9d5..e35635d0d7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -766,7 +766,7 @@ public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettle
 
          // We should have now drained the Queue
          receiver.flow(1);
-         AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+         AmqpMessage message = receiver.receive(1, TimeUnit.SECONDS);
          if (message != null) {
             System.out.println("Read message: " + message.getApplicationProperty("msgId"));
          }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 50ab38966d..7b1f155f70 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -324,7 +324,7 @@ private void receive(ConnectionSupplier consumerConnectionSupplier, String queue
          Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue consumerQueue = consumerSession.createQueue(queueName);
          MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
-         TextMessage msg = (TextMessage) consumer.receive(200);
+         TextMessage msg = (TextMessage) consumer.receive(2000);
          assertNotNull(msg);
          consumer.close();
       }
@@ -336,7 +336,7 @@ private void receiveNull(ConnectionSupplier consumerConnectionSupplier, String q
          Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue consumerQueue = consumerSession.createQueue(queueName);
          MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
-         TextMessage msg = (TextMessage) consumer.receive(200);
+         TextMessage msg = (TextMessage) consumer.receive(2000);
          assertNull(msg);
          consumer.close();
       }
@@ -349,8 +349,8 @@ private void receiveDualConsumer(ConnectionSupplier consumerConnectionSupplier,
          MessageConsumer consumer = createConsumer(consumerConnection, queueName);
          MessageConsumer consumer2 = createConsumer(consumerConnection2, queueName);
 
-         TextMessage msg = (TextMessage) consumer.receive(200);
-         TextMessage msg2 = (TextMessage) consumer2.receive(200);
+         TextMessage msg = (TextMessage) consumer.receive(2000);
+         TextMessage msg2 = (TextMessage) consumer2.receive(2000);
 
          assertNotNull(msg);
          assertNotNull(msg2);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 1b790a072d..58bf2d3d5a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -45,6 +45,11 @@ public void fireSlowConsumer() {
 
    }
 
+   @Override
+   public boolean allowReferenceCallback() {
+      return false;
+   }
+
    @Override
    public Object getProtocolData() {
       return null;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index f103d4182f..39200e68e3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -500,10 +500,16 @@ public void internalSend(int protocolSender, int protocolConsumer) throws Throwa
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          javax.jms.Queue queue = session.createQueue(QUEUE.toString());
          MessageProducer producer = session.createProducer(queue);
-         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         if (durable) {
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         } else {
+
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         }
 
          long time = System.currentTimeMillis();
-         int NUMBER_OF_MESSAGES = 100;
+         int NUMBER_OF_MESSAGES = durable ? 500 : 5000;
          for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
             TextMessage msg = session.createTextMessage("hello " + i);
             msg.setIntProperty("mycount", i);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index dc57a12aa2..3e64ac5fc7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -241,6 +241,11 @@ public void testHangDuplicateQueues() throws Exception {
                   addressSettingsRepository, executor, server, null);
          }
 
+         @Override
+         public boolean allowsReferenceCallback() {
+            return false;
+         }
+
          @Override
          public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter) throws Exception {
             latchDelete.countDown();
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index bea116770f..15ac691f6c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -83,6 +83,11 @@ public void setDispatching(boolean dispatching) {
 
    }
 
+   @Override
+   public boolean allowsReferenceCallback() {
+      return false;
+   }
+
    @Override
    public boolean isExclusive() {
       // no-op
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
index 40fadf9d9f..01402635dd 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
@@ -31,6 +31,7 @@
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -334,6 +335,10 @@ public MessageLoadBalancingType getMessageLoadBalancingType() {
       public void unproposed(SimpleString groupID) {
       }
 
+      @Override
+      public void updated(QueueBinding binding) {
+      }
+
       @Override
       public boolean redistribute(Message message,
                                   Queue originatingQueue,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services