[activemq-artemis] branch master updated: ARTEMIS-2607 interceptor returns false but processing continues

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated: ARTEMIS-2607 interceptor returns false but processing continues

clebertsuconic-2
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new a8cf6b0  ARTEMIS-2607 interceptor returns false but processing continues
     new 9a24b89  This closes #2974
a8cf6b0 is described below

commit a8cf6b04b4c54670f4b206cae917f4222e29201f
Author: Justin Bertram <[hidden email]>
AuthorDate: Fri Jan 31 14:21:20 2020 -0600

    ARTEMIS-2607 interceptor returns false but processing continues
---
 .../amqp/broker/AMQPConnectionCallback.java        |  8 +-
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 69 ++++++++---------
 .../amqp/broker/ProtonProtocolManager.java         |  8 +-
 .../amqp/proton/ProtonServerSenderContext.java     |  4 +-
 .../core/protocol/mqtt/MQTTProtocolHandler.java    | 10 ++-
 .../core/protocol/mqtt/MQTTProtocolManager.java    |  8 +-
 .../core/protocol/stomp/StompProtocolManager.java  |  8 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |  2 +-
 .../spi/core/protocol/AbstractProtocolManager.java |  8 +-
 .../amqp/AmqpSendReceiveInterceptorTest.java       | 67 +++++++++++++++++
 .../imported/MQTTRejectingInterceptorTest.java     | 64 ++++++++++++++++
 .../stomp/StompWithInterceptorsTest.java           |  6 +-
 .../stomp/StompWithRejectingInterceptorTest.java   | 86 ++++++++++++++++++++++
 13 files changed, 291 insertions(+), 57 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 1667945..a6cad89 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -294,11 +294,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
       return null;
    }
 
-   public void invokeIncomingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
-      manager.invokeIncoming(message, connection);
+   public String invokeIncomingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+      return manager.invokeIncoming(message, connection);
    }
 
-   public void invokeOutgoingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
-      manager.invokeOutgoing(message, connection);
+   public String invokeOutgoingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+      return manager.invokeOutgoing(message, connection);
    }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index a65361d..dad5d54 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -507,36 +507,39 @@ public class AMQPSessionCallback implements SessionCallback {
                            final Receiver receiver,
                            final RoutingContext routingContext) throws Exception {
       message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
-      invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
-      serverSession.send(transaction, message, directDeliver, false, routingContext);
-
-      afterIO(new IOCallback() {
-         @Override
-         public void done() {
-            connection.runLater(() -> {
-               if (delivery.getRemoteState() instanceof TransactionalState) {
-                  TransactionalState txAccepted = new TransactionalState();
-                  txAccepted.setOutcome(Accepted.getInstance());
-                  txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
-
-                  delivery.disposition(txAccepted);
-               } else {
-                  delivery.disposition(Accepted.getInstance());
-               }
-               delivery.settle();
-               context.flow();
-               connection.flush();
-            });
-         }
+      if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) {
+         serverSession.send(transaction, message, directDeliver, false, routingContext);
+
+         afterIO(new IOCallback() {
+            @Override
+            public void done() {
+               connection.runLater(() -> {
+                  if (delivery.getRemoteState() instanceof TransactionalState) {
+                     TransactionalState txAccepted = new TransactionalState();
+                     txAccepted.setOutcome(Accepted.getInstance());
+                     txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
+
+                     delivery.disposition(txAccepted);
+                  } else {
+                     delivery.disposition(Accepted.getInstance());
+                  }
+                  delivery.settle();
+                  context.flow();
+                  connection.flush();
+               });
+            }
 
-         @Override
-         public void onError(int errorCode, String errorMessage) {
-            connection.runNow(() -> {
-               receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
-               connection.flush();
-            });
-         }
-      });
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+               connection.runNow(() -> {
+                  receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
+                  connection.flush();
+               });
+            }
+         });
+      } else {
+         rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message");
+      }
    }
 
    /** Will execute a Runnable on an Address when there's space in memory*/
@@ -692,12 +695,12 @@ public class AMQPSessionCallback implements SessionCallback {
       manager.getServer().getSecurityStore().check(address, checkType, session);
    }
 
-   public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
-      protonSPI.invokeIncomingInterceptors(message, connection);
+   public String invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+      return protonSPI.invokeIncomingInterceptors(message, connection);
    }
 
-   public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
-      protonSPI.invokeOutgoingInterceptors(message, connection);
+   public String invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+      return protonSPI.invokeOutgoingInterceptors(message, connection);
    }
 
    public void addProducer(ServerProducer serverProducer) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 121020c..e0af481 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -294,12 +294,12 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
       return prefixes;
    }
 
-   public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
-      super.invokeInterceptors(this.incomingInterceptors, message, connection);
+   public String invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+      return super.invokeInterceptors(this.incomingInterceptors, message, connection);
    }
 
-   public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
-      super.invokeInterceptors(this.outgoingInterceptors, message, connection);
+   public String invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {
+      return super.invokeInterceptors(this.outgoingInterceptors, message, connection);
    }
 
    public int getInitialRemoteMaxFrameSize() {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 3bb9b4a..3ab2699 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -787,7 +787,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
          AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
 
-         sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection());
+         if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
+            return;
+         }
 
          // Let the Message decide how to present the message bytes
          ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount());
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index e7388e8..8b0aae8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -100,7 +100,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
 
          MQTTUtil.logMessage(session.getState(), message, true);
 
-         this.protocolManager.invokeIncoming(message, this.connection);
+         if (this.protocolManager.invokeIncoming(message, this.connection) != null) {
+            log.debugf("Interceptor rejected MQTT message: %s", message);
+            disconnect(true);
+            return;
+         }
 
          switch (message.fixedHeader().messageType()) {
             case CONNECT:
@@ -246,8 +250,10 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    }
 
    private void sendToClient(MqttMessage message) {
+      if (this.protocolManager.invokeOutgoing(message, connection) != null) {
+         return;
+      }
       MQTTUtil.logMessage(session.getSessionState(), message, false);
-      this.protocolManager.invokeOutgoing(message, connection);
       ctx.write(message);
       ctx.flush();
    }
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index d1777ea..d0f5d12 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -209,12 +209,12 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
       return websocketRegistryNames;
    }
 
-   public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
-      super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
+   public String invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
+      return super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
    }
 
-   public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
-      super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
+   public String invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
+      return super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
    }
 
    public boolean isClientConnected(String clientId, MQTTConnection connection) {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index e9bad87..2fe6224 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -154,8 +154,10 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
          }
 
          try {
-            invokeInterceptors(this.incomingInterceptors, request, conn);
             conn.logFrame(request, true);
+            if (invokeInterceptors(this.incomingInterceptors, request, conn) != null) {
+               return;
+            }
             conn.handleFrame(request);
          } finally {
             server.getStorageManager().clearContext();
@@ -187,7 +189,9 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
    // Public --------------------------------------------------------
 
    public boolean send(final StompConnection connection, final StompFrame frame) {
-      invokeInterceptors(this.outgoingInterceptors, frame, connection);
+      if (invokeInterceptors(this.outgoingInterceptors, frame, connection) != null) {
+         return false;
+      }
       connection.logFrame(frame, false);
 
       synchronized (connection) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index fb92e2d..84de5ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1967,7 +1967,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
 
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224082, value = "Failed to invoke an interceptor", format = Message.Format.MESSAGE_FORMAT)
-   void failedToInvokeAninterceptor(@Cause Exception e);
+   void failedToInvokeAnInterceptor(@Cause Exception e);
 
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224083, value = "Failed to close context", format = Message.Format.MESSAGE_FORMAT)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
index aaedecd..e27699b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
@@ -32,18 +32,20 @@ public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
 
-   protected void invokeInterceptors(final List<I> interceptors, final P message, final C connection) {
+   protected String invokeInterceptors(final List<I> interceptors, final P message, final C connection) {
       if (interceptors != null && !interceptors.isEmpty()) {
          for (I interceptor : interceptors) {
             try {
                if (!interceptor.intercept(message, connection)) {
-                  break;
+                  return interceptor.getClass().getName();
                }
             } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.failedToInvokeAninterceptor(e);
+               ActiveMQServerLogger.LOGGER.failedToInvokeAnInterceptor(e);
             }
          }
       }
+
+      return null;
    }
 
    @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
index 8dcb2bf..0dfcda2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
@@ -80,6 +80,73 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport {
       connection.close();
    }
 
+   @Test(timeout = 60000)
+   public void testRejectMessageWithIncomingInterceptor() throws Exception {
+      final CountDownLatch latch = new CountDownLatch(1);
+      server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() {
+         @Override
+         public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException {
+            latch.countDown();
+            return false;
+         }
+      });
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+
+      message.setMessageId("msg" + 1);
+      message.setText("Test-Message");
+      try {
+         sender.send(message);
+         fail("Sending message should have thrown exception here.");
+      } catch (Exception e) {
+         assertEquals("Interceptor rejected message [condition = failed]", e.getMessage());
+      }
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(2);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNull(amqpMessage);
+      sender.close();
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testRejectMessageWithOutgoingInterceptor() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+
+      message.setMessageId("msg" + 1);
+      message.setText("Test-Message");
+      sender.send(message);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+      server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() {
+         @Override
+         public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException {
+            latch.countDown();
+            return false;
+         }
+      });
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(2);
+      AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNull(amqpMessage);
+      assertEquals(latch.getCount(), 0);
+      sender.close();
+      receiver.close();
+      connection.close();
+   }
+
    private static final String ADDRESS = "address";
    private static final String MESSAGE_ID = "messageId";
    private static final String CORRELATION_ID = "correlationId";
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
new file mode 100644
index 0000000..cda06c0
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+
+public class MQTTRejectingInterceptorTest extends MQTTTestSupport {
+
+   @Rule
+   public ErrorCollector collector = new ErrorCollector();
+
+   @Test(timeout = 60000)
+   public void testRejectedMQTTMessage() throws Exception {
+      final String addressQueue = name.getMethodName();
+      final String msgText = "Test rejected message";
+
+      final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
+      initializeConnection(subscribeProvider);
+      subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
+
+      MQTTInterceptor incomingInterceptor = new MQTTInterceptor() {
+         @Override
+         public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
+            System.out.println("incoming");
+            if (packet.getClass() == MqttPublishMessage.class) {
+               return false;
+            } else {
+               return true;
+            }
+         }
+      };
+
+      server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
+
+      final MQTTClientProvider publishProvider = getMQTTClientProvider();
+      initializeConnection(publishProvider);
+      publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, false);
+      assertNull(subscribeProvider.receive(3000));
+
+      subscribeProvider.disconnect();
+      publishProvider.disconnect();
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java
index fad0e12..23def66 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java
@@ -38,8 +38,8 @@ public class StompWithInterceptorsTest extends StompTestBase {
    @Override
    public List<String> getIncomingInterceptors() {
       List<String> stompIncomingInterceptor = new ArrayList<>();
-      stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$IncomingStompInterceptor");
-      stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$CoreInterceptor");
+      stompIncomingInterceptor.add(IncomingStompInterceptor.class.getName());
+      stompIncomingInterceptor.add(CoreInterceptor.class.getName());
 
       return stompIncomingInterceptor;
    }
@@ -47,7 +47,7 @@ public class StompWithInterceptorsTest extends StompTestBase {
    @Override
    public List<String> getOutgoingInterceptors() {
       List<String> stompOutgoingInterceptor = new ArrayList<>();
-      stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$OutgoingStompInterceptor");
+      stompOutgoingInterceptor.add(OutgoingStompInterceptor.class.getName());
 
       return stompOutgoingInterceptor;
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java
new file mode 100644
index 0000000..736a56a
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StompWithRejectingInterceptorTest extends StompTestBase {
+
+   @Override
+   public List<String> getIncomingInterceptors() {
+      List<String> stompIncomingInterceptor = new ArrayList<>();
+      stompIncomingInterceptor.add(IncomingStompFrameRejectInterceptor.class.getName());
+
+      return stompIncomingInterceptor;
+   }
+
+   @Test
+   public void stompFrameInterceptor() throws Exception {
+      IncomingStompFrameRejectInterceptor.interceptedFrames.clear();
+
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
+      conn.connect(defUser, defPass);
+
+      ClientStompFrame frame = conn.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+      conn.sendFrame(frame);
+      conn.disconnect();
+
+      assertTrue(Wait.waitFor(() -> IncomingStompFrameRejectInterceptor.interceptedFrames.size() == 3, 2000, 50));
+
+      List<String> incomingCommands = new ArrayList<>(4);
+      incomingCommands.add("CONNECT");
+      incomingCommands.add("SEND");
+      incomingCommands.add("DISCONNECT");
+
+      for (int i = 0; i < IncomingStompFrameRejectInterceptor.interceptedFrames.size(); i++) {
+         Assert.assertEquals(incomingCommands.get(i), IncomingStompFrameRejectInterceptor.interceptedFrames.get(i).getCommand());
+      }
+
+      Wait.assertFalse(() -> server.locateQueue(SimpleString.toSimpleString(getQueuePrefix() + getQueueName())).getMessageCount() > 0, 1000, 100);
+   }
+
+   public static class IncomingStompFrameRejectInterceptor implements StompFrameInterceptor {
+
+      static List<StompFrame> interceptedFrames = Collections.synchronizedList(new ArrayList<>());
+
+      @Override
+      public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
+         interceptedFrames.add(stompFrame);
+         if (stompFrame.getCommand() == Stomp.Commands.SEND) {
+            return false;
+         }
+         return true;
+      }
+   }
+}
\ No newline at end of file