[GitHub] michaelandrepearce closed pull request #2488: ARTEMIS-196 Implement Consumer Priority

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] michaelandrepearce closed pull request #2488: ARTEMIS-196 Implement Consumer Priority

GitBox
michaelandrepearce closed pull request #2488: ARTEMIS-196 Implement Consumer Priority
URL: https://github.com/apache/activemq-artemis/pull/2488
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
index 12c5f86321..0f7eb3c13b 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
@@ -32,6 +32,7 @@
    public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers";
    public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch";
    public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch";
+   public static final String CONSUMER_PRIORITY = "consumer-priority";
 
    private RoutingType routingType;
    private SimpleString filterString;
@@ -44,6 +45,7 @@
    private Boolean purgeOnNoConsumers;
    private Integer consumersBeforeDispatch;
    private Long delayBeforeDispatch;
+   private Integer consumerPriority;
 
    public void set(String key, String value) {
       if (key != null && value != null) {
@@ -69,6 +71,8 @@ public void set(String key, String value) {
             setConsumersBeforeDispatch(Integer.valueOf(value));
          } else if (key.equals(DELAY_BEFORE_DISPATCH)) {
             setDelayBeforeDispatch(Long.valueOf(value));
+         } else if (key.equals(CONSUMER_PRIORITY)) {
+            setConsumerPriority(Integer.valueOf(value));
          }
       }
    }
@@ -172,4 +176,13 @@ public QueueAttributes setDelayBeforeDispatch(Long delayBeforeDispatch) {
       return this;
    }
 
+   public Integer getConsumerPriority() {
+      return consumerPriority;
+   }
+
+   public QueueAttributes setConsumerPriority(Integer consumerPriority) {
+      this.consumerPriority = consumerPriority;
+      return this;
+   }
+
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index e7cb4cb1a3..97b3f87d4d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -470,6 +470,8 @@ public static String getDefaultHapolicyBackupStrategy() {
 
    public static final int DEFAULT_MAX_QUEUE_CONSUMERS = -1;
 
+   public static final int DEFAULT_CONSUMER_PRIORITY = 0;
+
    public static final boolean DEFAULT_EXCLUSIVE = false;
 
    public static final boolean DEFAULT_LAST_VALUE = false;
@@ -1310,6 +1312,10 @@ public static int getDefaultMaxQueueConsumers() {
       return DEFAULT_MAX_QUEUE_CONSUMERS;
    }
 
+   public static int getDefaultConsumerPriority() {
+      return DEFAULT_CONSUMER_PRIORITY;
+   }
+
    public static boolean getDefaultExclusive() {
       return DEFAULT_EXCLUSIVE;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index b400c911fd..14ca75cc59 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -879,6 +879,58 @@ ClientConsumer createConsumer(SimpleString queueName,
     *
     * @param queueName  name of the queue to consume messages from
     * @param filter     only messages which match this filter will be consumed
+    * @param priority   the consumer priority
+    * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
+    * @return a ClientConsumer
+    * @throws ActiveMQException if an exception occurs while creating the ClientConsumer
+    */
+   ClientConsumer createConsumer(SimpleString queueName,
+                                 SimpleString filter,
+                                 int priority,
+                                 boolean browseOnly) throws ActiveMQException;
+
+   /**
+    * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
+    * the given name.
+    * <p>
+    * If <code>browseOnly</code> is <code>true</code>, the ClientConsumer will receive the messages
+    * from the queue but they will not be consumed (the messages will remain in the queue). Note
+    * that paged messages will not be in the queue, and will therefore not be visible if
+    * {@code browseOnly} is {@code true}.
+    * <p>
+    * If <code>browseOnly</code> is <code>false</code>, the ClientConsumer will behave like consume
+    * the messages from the queue and the messages will effectively be removed from the queue.
+    *
+    * @param queueName  name of the queue to consume messages from
+    * @param filter     only messages which match this filter will be consumed
+    * @param windowSize the consumer window size
+    * @param maxRate    the maximum rate to consume messages
+    * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
+    * @return a ClientConsumer
+    * @throws ActiveMQException if an exception occurs while creating the ClientConsumer
+    */
+   ClientConsumer createConsumer(SimpleString queueName,
+                                 SimpleString filter,
+                                 int windowSize,
+                                 int maxRate,
+                                 boolean browseOnly) throws ActiveMQException;
+
+
+   /**
+    * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
+    * the given name.
+    * <p>
+    * If <code>browseOnly</code> is <code>true</code>, the ClientConsumer will receive the messages
+    * from the queue but they will not be consumed (the messages will remain in the queue). Note
+    * that paged messages will not be in the queue, and will therefore not be visible if
+    * {@code browseOnly} is {@code true}.
+    * <p>
+    * If <code>browseOnly</code> is <code>false</code>, the ClientConsumer will behave like consume
+    * the messages from the queue and the messages will effectively be removed from the queue.
+    *
+    * @param queueName  name of the queue to consume messages from
+    * @param filter     only messages which match this filter will be consumed
+    * @param priority   the consumer priority
     * @param windowSize the consumer window size
     * @param maxRate    the maximum rate to consume messages
     * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
@@ -887,6 +939,7 @@ ClientConsumer createConsumer(SimpleString queueName,
     */
    ClientConsumer createConsumer(SimpleString queueName,
                                  SimpleString filter,
+                                 int priority,
                                  int windowSize,
                                  int maxRate,
                                  boolean browseOnly) throws ActiveMQException;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 4946efb960..7f9ede3cf6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -70,6 +70,8 @@
 
    private final SimpleString filterString;
 
+   private final int priority;
+
    private final SimpleString queueName;
 
    private final boolean browseOnly;
@@ -141,6 +143,7 @@ public ClientConsumerImpl(final ClientSessionInternal session,
                              final ConsumerContext consumerContext,
                              final SimpleString queueName,
                              final SimpleString filterString,
+                             final int priority,
                              final boolean browseOnly,
                              final int initialWindow,
                              final int clientWindowSize,
@@ -157,6 +160,8 @@ public ClientConsumerImpl(final ClientSessionInternal session,
 
       this.filterString = filterString;
 
+      this.priority = priority;
+
       this.browseOnly = browseOnly;
 
       this.sessionContext = sessionContext;
@@ -562,6 +567,11 @@ public SimpleString getFilterString() {
       return filterString;
    }
 
+   @Override
+   public int getPriority() {
+      return priority;
+   }
+
    @Override
    public SimpleString getQueueName() {
       return queueName;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
index 819082134b..55d30e7e6b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
@@ -29,6 +29,8 @@
 
    SimpleString getFilterString();
 
+   int getPriority();
+
    boolean isBrowseOnly();
 
    void handleMessage(ClientMessageInternal message) throws Exception;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 5efaed682f..e8e39e74a6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -798,6 +798,14 @@ public ClientConsumer createConsumer(final SimpleString queueName,
       return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate, browseOnly);
    }
 
+   @Override
+   public ClientConsumer createConsumer(final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final int priority,
+                                        final boolean browseOnly) throws ActiveMQException {
+      return createConsumer(queueName, filterString, priority, consumerWindowSize, consumerMaxRate, browseOnly);
+   }
+
    @Override
    public ClientConsumer createConsumer(final SimpleString queueName,
                                         final boolean browseOnly) throws ActiveMQException {
@@ -821,6 +829,15 @@ public boolean isWritable(ReadyListener callback) {
       return sessionContext.isWritable(callback);
    }
 
+   @Override
+   public ClientConsumer createConsumer(final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final int windowSize,
+                                        final int maxRate,
+                                        final boolean browseOnly) throws ActiveMQException {
+      return createConsumer(queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), windowSize, maxRate, browseOnly);
+   }
+
    /**
     * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on
     * the remoting thread).
@@ -834,10 +851,11 @@ public boolean isWritable(ReadyListener callback) {
    @Override
    public ClientConsumer createConsumer(final SimpleString queueName,
                                         final SimpleString filterString,
+                                        final int priority,
                                         final int windowSize,
                                         final int maxRate,
                                         final boolean browseOnly) throws ActiveMQException {
-      return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly);
+      return internalCreateConsumer(queueName, filterString, priority, windowSize, maxRate, browseOnly);
    }
 
    @Override
@@ -1931,12 +1949,13 @@ public String toString() {
     */
    private ClientConsumer internalCreateConsumer(final SimpleString queueName,
                                                  final SimpleString filterString,
+                                                 final int priority,
                                                  final int windowSize,
                                                  final int maxRate,
                                                  final boolean browseOnly) throws ActiveMQException {
       checkClosed();
 
-      ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
+      ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
 
       addConsumer(consumer);
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index ccf10ab1ce..bfe8ec0287 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -367,6 +367,7 @@ public boolean isWritable(ReadyListener callback) {
    @Override
    public ClientConsumerInternal createConsumer(SimpleString queueName,
                                                 SimpleString filterString,
+                                                int priority,
                                                 int windowSize,
                                                 int maxRate,
                                                 int ackBatchSize,
@@ -377,7 +378,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
 
       ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
 
-      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true);
 
       SessionQueueQueryResponseMessage queueInfo;
 
@@ -392,7 +393,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
       // The value we send is just a hint
       final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
 
-      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
    }
 
    @Override
@@ -875,7 +876,7 @@ public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal,
          sendPacketWithoutLock(sessionChannel, createQueueRequest);
       }
 
-      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), false);
+      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.getPriority(), consumerInternal.isBrowseOnly(), false);
 
       sendPacketWithoutLock(sessionChannel, createConsumerRequest);
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
index e07b50ca2f..a4f2acebb8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
@@ -25,6 +26,8 @@
 
    private SimpleString filterString;
 
+   private int priority;
+
    private boolean browseOnly;
 
    private boolean requiresResponse;
@@ -32,6 +35,7 @@
    public SessionCreateConsumerMessage(final long id,
                                        final SimpleString queueName,
                                        final SimpleString filterString,
+                                       final int priority,
                                        final boolean browseOnly,
                                        final boolean requiresResponse) {
       super(SESS_CREATECONSUMER);
@@ -39,6 +43,7 @@ public SessionCreateConsumerMessage(final long id,
       this.id = id;
       this.queueName = queueName;
       this.filterString = filterString;
+      this.priority = priority;
       this.browseOnly = browseOnly;
       this.requiresResponse = requiresResponse;
    }
@@ -52,6 +57,7 @@ public String toString() {
       StringBuffer buff = new StringBuffer(getParentString());
       buff.append(", queueName=" + queueName);
       buff.append(", filterString=" + filterString);
+      buff.append(", priority=" + priority);
       buff.append(", id=" + id);
       buff.append(", browseOnly=" + browseOnly);
       buff.append(", requiresResponse=" + requiresResponse);
@@ -67,6 +73,10 @@ public SimpleString getFilterString() {
       return filterString;
    }
 
+   public int getPriority() {
+      return priority;
+   }
+
    public boolean isBrowseOnly() {
       return browseOnly;
    }
@@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
       this.filterString = filterString;
    }
 
+   public void setPriority(byte priority) {
+      this.priority = priority;
+   }
+
    public void setBrowseOnly(boolean browseOnly) {
       this.browseOnly = browseOnly;
    }
@@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeNullableSimpleString(filterString);
       buffer.writeBoolean(browseOnly);
       buffer.writeBoolean(requiresResponse);
+      buffer.writeInt(priority);
    }
 
    @Override
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
       filterString = buffer.readNullableSimpleString();
       browseOnly = buffer.readBoolean();
       requiresResponse = buffer.readBoolean();
+      if (buffer.readableBytes() > 0) {
+         priority = buffer.readInt();
+      } else {
+         priority = ActiveMQDefaultConfiguration.getDefaultConsumerPriority();
+      }
    }
 
    @Override
@@ -113,6 +133,7 @@ public int hashCode() {
       result = prime * result + (browseOnly ? 1231 : 1237);
       result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
       result = prime * result + (int) (id ^ (id >>> 32));
+      result = prime * result + priority;
       result = prime * result + ((queueName == null) ? 0 : queueName.hashCode());
       result = prime * result + (requiresResponse ? 1231 : 1237);
       return result;
@@ -134,6 +155,8 @@ public boolean equals(Object obj) {
             return false;
       } else if (!filterString.equals(other.filterString))
          return false;
+      if (priority != other.priority)
+         return false;
       if (id != other.id)
          return false;
       if (queueName == null) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index e25441b012..93352049be 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -314,6 +314,7 @@ public abstract void sendACK(boolean individual,
 
    public abstract ClientConsumerInternal createConsumer(SimpleString queueName,
                                                          SimpleString filterString,
+                                                         int priority,
                                                          int windowSize,
                                                          int maxRate,
                                                          int ackBatchSize,
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 693ab84ead..60f5d8ba09 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -51,6 +51,7 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.QueueAttributes;
@@ -710,7 +711,7 @@ private ActiveMQMessageConsumer internalCreateSharedConsumer(final ActiveMQDesti
             }
          }
 
-         consumer = session.createConsumer(queueName, null, false);
+         consumer = createClientConsumer(dest, queueName, null);
 
          ActiveMQMessageConsumer jbc = new ActiveMQMessageConsumer(options, connection, this, consumer, false, dest, selectorString, autoDeleteQueueName);
 
@@ -779,7 +780,7 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
 
             connection.addKnownDestination(dest.getSimpleAddress());
 
-            consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false);
+            consumer = createClientConsumer(dest, null, coreFilterString);
          } else {
             AddressQuery response = session.addressQuery(dest.getSimpleAddress());
 
@@ -804,8 +805,7 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
 
                createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
 
-               consumer = session.createConsumer(queueName, null, false);
-
+               consumer = createClientConsumer(dest, queueName, null);
                autoDeleteQueueName = queueName;
             } else {
                // Durable sub
@@ -860,7 +860,7 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
                   }
                }
 
-               consumer = session.createConsumer(queueName, null, false);
+               consumer = createClientConsumer(dest, queueName, null);
             }
          }
 
@@ -874,6 +874,12 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
       }
    }
 
+   private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
+      QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+      int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
+      return session.createConsumer(queueName == null ? destination.getSimpleAddress() : queueName, coreFilterString, priority, false);
+   }
+
    public void ackAllConsumers() throws JMSException {
       checkClosed();
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 1ca4410a4d..432b797a7b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@@ -84,6 +86,8 @@
 
    private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
 
+   private static final Symbol PRIORITY = Symbol.getSymbol("priority");
+
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
 
    private final AMQPConnectionCallback protonSPI;
@@ -223,7 +227,9 @@ public Object createSender(ProtonServerSenderContext protonSender,
 
       filter = SelectorTranslator.convertToActiveMQFilterString(filter);
 
-      ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null);
+      int priority = getPriority(protonSender.getSender().getRemoteProperties());
+
+      ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), priority, browserOnly, false, null);
 
       // AMQP handles its own flow control for when it's started
       consumer.setStarted(true);
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender,
       return consumer;
    }
 
+   private int getPriority(Map<Symbol, Object> properties) {
+      Integer value = properties == null ? null : (Integer) properties.get(PRIORITY);
+      return value == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : value;
+   }
+
    public void startSender(Object brokerConsumer) throws Exception {
       ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
       // flow control is done at proton
diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
index 235d699293..4ae4968193 100644
--- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
+++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
@@ -78,6 +78,7 @@ protected CreateSessionMessage newCreateSession(String username,
    @Override
    public ClientConsumerInternal createConsumer(SimpleString queueName,
                                                 SimpleString filterString,
+                                                int priority,
                                                 int windowSize,
                                                 int maxRate,
                                                 int ackBatchSize,
@@ -88,7 +89,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
 
       ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
 
-      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true);
 
       SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
 
@@ -96,7 +97,7 @@ public ClientConsumerInternal createConsumer(SimpleString queueName,
       // could be overridden on the queue settings
       // The value we send is just a hint
 
-      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
    }
 
 }
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 45d9fa1ea1..c35bc644ce 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -137,11 +137,10 @@ public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, lo
       }
 
       SimpleString destinationName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
-
       if (openwireDestination.isTopic()) {
          SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
 
-         serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
+         serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
          //only advisory topic consumers need this.
          ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
@@ -151,7 +150,7 @@ public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, lo
          } catch (ActiveMQQueueExistsException e) {
             // ignore
          }
-         serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.isBrowser(), false, -1);
+         serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.getPriority(), info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
          AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString());
          if (addrSettings != null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 3b68d8b837..b2393c9b0f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -323,7 +323,7 @@ private void slowPacketHandler(final Packet packet) {
                case SESS_CREATECONSUMER: {
                   SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
                   requiresResponse = request.isRequiresResponse();
-                  session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
+                  session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.getPriority(), request.isBrowseOnly(), true, null);
                   if (requiresResponse) {
                      // We send back queue information on the queue as a response- this allows the queue to
                      // be automatically recreated on failover
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 6df48890da..04df32194e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -21,7 +21,7 @@
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
-public interface Consumer {
+public interface Consumer extends PriorityAware {
 
    /**
     *
@@ -80,4 +80,5 @@ default boolean supportsDirectDelivery() {
 
    /** an unique sequential ID for this consumer */
    long sequentialID();
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PriorityAware.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PriorityAware.java
new file mode 100644
index 0000000000..c719d84726
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PriorityAware.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+
+public interface PriorityAware {
+
+   default int getPriority() {
+      return ActiveMQDefaultConfiguration.getDefaultConsumerPriority();
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 37442b2c42..3a26ff8d5b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -104,7 +104,15 @@
 
    void addCloseable(Closeable closeable);
 
-   /**
+   ServerConsumer createConsumer(long consumerID,
+                                 SimpleString queueName,
+                                 SimpleString filterString,
+                                 int priority,
+                                 boolean browseOnly,
+                                 boolean supportLargeMessage,
+                                 Integer credits) throws Exception;
+
+    /**
     * To be used by protocol heads that needs to control the transaction outside the session context.
     */
    void resetTX(Transaction transaction);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
new file mode 100644
index 0000000000..5c755149e2
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface QueueConsumers<T extends PriorityAware> extends Collection<T> {
+
+   Set<Integer> getPriorites();
+
+   boolean hasNext();
+
+   T next();
+
+   QueueConsumers<T> reset();
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
new file mode 100644
index 0000000000..496d9bccc0
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the top getPriority,
+ *     and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
+ *
+ * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
+ *         but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
+
+   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
+
+   private volatile Level<T>[] levels;
+   private volatile int size;
+
+   private void setArray(Level<T>[] array) {
+      this.levels = array;
+   }
+
+   private Level<T>[] getArray() {
+      return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+      levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static <T> Level<T>[] newLevelArrayInstance(int length) {
+      return (Level<T>[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+      return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+      return size() == 0;
+   }
+
+   @Override
+   public Set<Integer> getPriorites() {
+      Level<T>[] levels = getArray();
+      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator<T> iterator() {
+      return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+      return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+      return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers<T> reset() {
+      iterator.reset();
+      return this;
+   }
+
+   @Override
+   public void forEach(Consumer<? super T> action) {
+      Objects.requireNonNull(action);
+      Level<T>[] current = getArray();
+      int len = current.length;
+      for (int i = 0; i < len; ++i) {
+         current[i].forEach(action);
+      }
+   }
+
+   private Level<T> getLevel(int level, boolean createIfMissing) {
+      Level<T>[] current = getArray();
+      int low = 0;
+      int high = current.length - 1;
+
+      while (low <= high) {
+         int mid = (low + high) >>> 1;
+         Level<T> midVal = current[mid];
+
+         if (midVal.level() > level)
+            low = mid + 1;
+         else if (midVal.level() < level)
+            high = mid - 1;
+         else
+            return midVal; //key found
+      }
+
+      if (createIfMissing) {
+         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
+         if (low > 0) {
+            System.arraycopy(current, 0, newLevels, 0, low);
+         }
+         if (current.length - low > 0) {
+            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
+         }
+         newLevels[low] = new Level<T>(level);
+         setArray(newLevels);
+         return newLevels[low];
+      }
+      return null;
+   }
+
+   @Override
+   public synchronized boolean add(T t) {
+      boolean result = addInternal(t);
+      calcSize();
+      return result;
+   }
+
+   private boolean addInternal(T t) {
+      if (t == null) return false;
+      Level<T> level = getLevel(t.getPriority(), true);
+      return level.add(t);
+   }
+
+   @Override
+   public boolean remove(Object o) {
+      return o instanceof PriorityAware && remove((PriorityAware) o);
+   }
+
+   public synchronized boolean remove(PriorityAware priorityAware) {
+      boolean result = removeInternal(priorityAware);
+      calcSize();
+      return result;
+   }
+
+   private boolean removeInternal(PriorityAware priorityAware) {
+      if ( priorityAware == null) return false;
+      Level<T> level = getLevel(priorityAware.getPriority(), false);
+      boolean result = level != null && level.remove(priorityAware);
+      if (level != null && level.size() == 0) {
+         removeLevel(level.level);
+      }
+      return result;
+   }
+
+   private Level<T> removeLevel(int level) {
+      Level<T>[] current = getArray();
+      int len = current.length;
+      int low = 0;
+      int high = len - 1;
+
+      while (low <= high) {
+         int mid = (low + high) >>> 1;
+         Level<T> midVal = current[mid];
+
+         if (midVal.level() > level)
+            low = mid + 1;
+         else if (midVal.level() < level)
+            high = mid - 1;
+         else {
+            Level<T>[] newLevels = newLevelArrayInstance(len - 1);
+            System.arraycopy(current, 0, newLevels, 0, mid);
+            System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1);
+            setArray(newLevels);
+            return midVal; //key found
+         }
+      }
+      return null;
+   }
+
+   @Override
+   public boolean containsAll(Collection<?> c) {
+      Objects.requireNonNull(c);
+      for (Object e : c)
+         if (!contains(e))
+            return false;
+      return true;
+   }
+
+   @Override
+   public synchronized boolean addAll(Collection<? extends T> c) {
+      Objects.requireNonNull(c);
+      boolean modified = false;
+      for (T e : c)
+         if (addInternal(e))
+            modified = true;
+      calcSize();
+      return modified;
+   }
+
+   @Override
+   public synchronized boolean removeAll(Collection<?> c) {
+      Objects.requireNonNull(c);
+      boolean modified = false;
+      for (Object o : c) {
+         if (remove(o)) {
+            modified = true;
+         }
+      }
+      calcSize();
+      return modified;
+   }
+
+   @Override
+   public synchronized boolean retainAll(Collection<?> c) {
+      Objects.requireNonNull(c);
+      boolean modified = false;
+      Level<T>[] levels = getArray();
+      for (Level<T> level : levels) {
+         if (level.retainAll(c)) {
+            modified = true;
+         }
+      }
+      calcSize();
+      return modified;
+   }
+
+   @Override
+   public synchronized void clear() {
+      Level<T>[] levels = getArray();
+      for (Level<T> level : levels) {
+         level.clear();
+      }
+      calcSize();
+   }
+
+
+
+   @Override
+   public boolean contains(Object o) {
+      return o instanceof PriorityAware && contains((PriorityAware) o);
+   }
+
+   public boolean contains(PriorityAware priorityAware) {
+      if (priorityAware == null) return false;
+      Level<T> level = getLevel(priorityAware.getPriority(), false);
+      return level != null && level.contains(priorityAware);
+   }
+
+   private void calcSize() {
+      Level<T>[] current = getArray();
+      int size = 0;
+      for (Level<T> level : current) {
+         size += level.size();
+      }
+      this.size = size;
+   }
+
+   private static class QueueConsumersIterator<T extends PriorityAware> implements ResetableIterator<T> {
+
+      private final QueueConsumersImpl<T> queueConsumers;
+      private final boolean resetable;
+      private Level<T>[] levels;
+      int level = -1;
+      private ResetableIterator<T> currentIterator;
+
+      private QueueConsumersIterator(QueueConsumersImpl<T> queueConsumers, boolean resetable) {
+         this.queueConsumers = queueConsumers;
+         this.levels = queueConsumers.getArray();
+         this.resetable = resetable;
+
+      }
+
+      @Override
+      public boolean hasNext() {
+         while (true) {
+            if (currentIterator != null) {
+               if (currentIterator.hasNext()) {
+                  return true;
+               }
+            }
+            int nextLevel = level + 1;
+            if (levels != null && nextLevel < levels.length) {
+               moveToLevel(nextLevel);
+            } else {
+               return false;
+            }
+         }
+      }
+
+      @Override
+      public T next() {
+         while (true) {
+            if (currentIterator != null) {
+               if (currentIterator.hasNext()) {
+                  return currentIterator.next();
+               }
+            }
+            int nextLevel = level + 1;
+            if (levels != null && nextLevel < levels.length) {
+               moveToLevel(nextLevel);
+            } else {
+               return null;
+            }
+         }
+      }
+
+      private void moveToLevel(int level) {
+         Level<T> level0 = levels[level];
+         if (resetable) {
+            currentIterator = level0.resetableIterator().reset();
+         } else {
+            currentIterator = level0.iterator();
+         }
+         this.level = level;
+      }
+
+      @Override
+      public ResetableIterator<T> reset() {
+         if (!resetable) {
+            throw new IllegalStateException("Iterator is not resetable");
+         }
+         levels = queueConsumers.getArray();
+         level = -1;
+         currentIterator = null;
+         return this;
+      }
+   }
+
+   /**
+    * This is represents a getPriority and is modeled on {@link java.util.concurrent.CopyOnWriteArrayList}.
+    *
+    * @param <E>
+    */
+   private static class Level<E> {
+
+      /** The array, accessed only via getArray/setArray. */
+      private transient volatile Object[] array;
+
+      private transient volatile ResetableIterator<E> resetableIterator;
+
+      private final int level;
+
+      /**
+       * Gets the array.  Non-private so as to also be accessible
+       * from CopyOnWriteArraySet class.
+       */
+      private Object[] getArray() {
+         return array;
+      }
+
+      /**
+       * Sets the array.
+       */
+      private void setArray(Object[] a) {
+         array = a;
+         resetableIterator = new LevelResetableIterator<>(a);
+      }
+
+      /**
+       * Creates an empty list.
+       */
+      private Level(int level) {
+         setArray(new Object[0]);
+         this.level = level;
+      }
+
+      public int level() {
+         return level;
+      }
+
+      public void forEach(Consumer<? super E> action) {
+         if (action == null) throw new NullPointerException();
+         Object[] elements = getArray();
+         for (Object element : elements) {
+            @SuppressWarnings("unchecked") E e = (E) element;
+            action.accept(e);
+         }
+      }
+
+      /**
+       * Returns the number of elements in this list.
+       *
+       * @return the number of elements in this list
+       */
+      public int size() {
+         return getArray().length;
+      }
+
+      /**
+       * Returns {@code true} if this list contains no elements.
+       *
+       * @return {@code true} if this list contains no elements
+       */
+      public boolean isEmpty() {
+         return size() == 0;
+      }
+
+      /**
+       * Returns {@code true} if this list contains the specified element.
+       * More formally, returns {@code true} if and only if this list contains
+       * at least one element {@code e} such that
+       * <tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>.
+       *
+       * @param o element whose presence in this list is to be tested
+       * @return {@code true} if this list contains the specified element
+       */
+      public boolean contains(Object o) {
+         Object[] elements = getArray();
+         return indexOf(o, elements, 0, elements.length) >= 0;
+      }
+
+      /**
+       * Tests for equality, coping with nulls.
+       */
+      private static boolean eq(Object o1, Object o2) {
+         return (o1 == null) ? o2 == null : o1.equals(o2);
+      }
+
+      /**
+       * static version of indexOf, to allow repeated calls without
+       * needing to re-acquire array each time.
+       * @param o element to search for
+       * @param elements the array
+       * @param index first index to search
+       * @param fence one past last index to search
+       * @return index of element, or -1 if absent
+       */
+      private static int indexOf(Object o, Object[] elements,
+                           int index, int fence) {
+         if (o == null) {
+            for (int i = index; i < fence; i++)
+               if (elements[i] == null)
+                  return i;
+         } else {
+            for (int i = index; i < fence; i++)
+               if (o.equals(elements[i]))
+                  return i;
+         }
+         return -1;
+      }
+
+      /**
+       * Appends the specified element to the end of this list.
+       *
+       * @param e element to be appended to this list
+       * @return {@code true} (as specified by {@link Collection#add})
+       */
+      public boolean add(E e) {
+         Object[] elements = getArray();
+         int len = elements.length;
+         Object[] newElements = Arrays.copyOf(elements, len + 1);
+         newElements[len] = e;
+         setArray(newElements);
+         return true;
+      }
+
+      /**
+       * Removes the first occurrence of the specified element from this list,
+       * if it is present.  If this list does not contain the element, it is
+       * unchanged.  More formally, removes the element with the lowest index
+       * {@code i} such that
+       * <tt>(o==null&nbsp;?&nbsp;get(i)==null&nbsp;:&nbsp;o.equals(get(i)))</tt>
+       * (if such an element exists).  Returns {@code true} if this list
+       * contained the specified element (or equivalently, if this list
+       * changed as a result of the call).
+       *
+       * @param o element to be removed from this list, if present
+       * @return {@code true} if this list contained the specified element
+       */
+      public boolean remove(Object o) {
+         Object[] snapshot = getArray();
+         int index = indexOf(o, snapshot, 0, snapshot.length);
+         return (index >= 0) && remove(o, snapshot, index);
+      }
+
+      /**
+       * A version of remove(Object) using the strong hint that given
+       * recent snapshot contains o at the given index.
+       */
+      private boolean remove(Object o, Object[] snapshot, int index) {
+         Object[] current = getArray();
+         int len = current.length;
+         if (snapshot != current)
+            findIndex: {
+               int prefix = Math.min(index, len);
+               for (int i = 0; i < prefix; i++) {
+                  if (current[i] != snapshot[i] && eq(o, current[i])) {
+                     index = i;
+                     break findIndex;
+                  }
+               }
+               if (index >= len)
+                  return false;
+               if (current[index] == o)
+                  break findIndex;
+               index = indexOf(o, current, index, len);
+               if (index < 0)
+                  return false;
+            }
+         Object[] newElements = new Object[len - 1];
+         System.arraycopy(current, 0, newElements, 0, index);
+         System.arraycopy(current, index + 1,
+               newElements, index,
+               len - index - 1);
+         setArray(newElements);
+         return true;
+      }
+
+      /**
+       * Retains only the elements in this list that are contained in the
+       * specified collection.  In other words, removes from this list all of
+       * its elements that are not contained in the specified collection.
+       *
+       * @param c collection containing elements to be retained in this list
+       * @return {@code true} if this list changed as a result of the call
+       * @throws ClassCastException if the class of an element of this list
+       *       is incompatible with the specified collection
+       *       (<a href="../Collection.html#optional-restrictions">optional</a>)
+       * @throws NullPointerException if this list contains a null element and the
+       *       specified collection does not permit null elements
+       *       (<a href="../Collection.html#optional-restrictions">optional</a>),
+       *       or if the specified collection is null
+       * @see #remove(Object)
+       */
+      public boolean retainAll(Collection<?> c) {
+         if (c == null) throw new NullPointerException();
+         Object[] elements = getArray();
+         int len = elements.length;
+         if (len != 0) {
+            // temp array holds those elements we know we want to keep
+            int newlen = 0;
+            Object[] temp = new Object[len];
+            for (int i = 0; i < len; ++i) {
+               Object element = elements[i];
+               if (c.contains(element))
+                  temp[newlen++] = element;
+            }
+            if (newlen != len) {
+               setArray(Arrays.copyOf(temp, newlen));
+               return true;
+            }
+         }
+         return false;
+      }
+
+      /**
+       * Removes all of the elements from this list.
+       * The list will be empty after this call returns.
+       */
+      public void clear() {
+         setArray(new Object[0]);
+      }
+
+      private ResetableIterator<E> resetableIterator() {
+         return resetableIterator;
+      }
+
+      public ResetableIterator<E> iterator() {
+         return new LevelResetableIterator<>(getArray());
+      }
+
+      private static class LevelResetableIterator<T> implements ResetableIterator<T> {
+
+         private final Object[] array;
+         private int cursor = 0;
+         private int endPos = -1;
+         private boolean hasNext;
+
+         private LevelResetableIterator(Object[] array) {
+            this.array = array;
+            reset();
+         }
+
+         @Override
+         public ResetableIterator<T> reset() {
+            endPos = cursor;
+            hasNext = array.length > 0;
+            return this;
+         }
+
+         @Override
+         public boolean hasNext() {
+            return hasNext;
+         }
+
+         @Override
+         public T next() {
+            if (!hasNext) {
+               throw new IllegalStateException();
+            }
+            @SuppressWarnings("unchecked") T result = (T) array[cursor];
+            cursor++;
+            if (cursor == array.length) {
+               cursor = 0;
+            }
+            if (cursor == endPos) {
+               hasNext = false;
+            }
+            return result;
+         }
+      }
+   }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 292cfc1e52..31973763ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -21,6 +21,7 @@
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,9 +30,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +43,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Collectors;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
@@ -72,6 +75,7 @@
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.PriorityAware;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -184,11 +188,6 @@
 
    private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
 
-   // used to control if we should recalculate certain positions inside deliverAsync
-   private volatile boolean consumersChanged = true;
-
-   private final List<ConsumerHolder> consumerList = new CopyOnWriteArrayList<>();
-
    private final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
    private AtomicLong messagesAdded = new AtomicLong(0);
@@ -235,14 +234,13 @@
    private final AtomicInteger consumersCount = new AtomicInteger();
 
    private volatile long consumerRemovedTimestamp = -1;
-
-   private final Set<Consumer> consumerSet = new HashSet<>();
+   private final QueueConsumers<ConsumerHolder<Consumer>> consumers = new QueueConsumersImpl<>();
 
    private final Map<SimpleString, Consumer> groups = new HashMap<>();
 
-   private volatile SimpleString expiryAddress;
+   private volatile Consumer exclusiveConsumer;
 
-   private int pos;
+   private volatile SimpleString expiryAddress;
 
    private final ArtemisExecutor executor;
 
@@ -327,7 +325,7 @@ public String debug() {
 
       out.println("queueMemorySize=" + queueMemorySize);
 
-      for (ConsumerHolder holder : consumerList) {
+      for (ConsumerHolder holder : consumers) {
          out.println("consumer: " + holder.consumer.debug());
       }
 
@@ -561,6 +559,9 @@ public boolean isExclusive() {
    @Override
    public synchronized void setExclusive(boolean exclusive) {
       this.exclusive = exclusive;
+      if (!exclusive) {
+         exclusiveConsumer = null;
+      }
    }
 
    @Override
@@ -1022,22 +1023,19 @@ public void addConsumer(final Consumer consumer) throws Exception {
       enterCritical(CRITICAL_CONSUMER);
       try {
          synchronized (this) {
-            if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumersCount.get() >= maxConsumers) {
+            if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) {
                throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
             }
 
-            consumersChanged = true;
-
             if (!consumer.supportsDirectDelivery()) {
                this.supportsDirectDeliver = false;
             }
 
             cancelRedistributor();
-
-            consumerList.add(new ConsumerHolder(consumer));
-
-            if (consumerSet.add(consumer)) {
-               int currentConsumerCount = consumersCount.incrementAndGet();
+            ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer);
+            if (!consumers.contains(newConsumerHolder)) {
+               consumers.add(newConsumerHolder);
+               int currentConsumerCount = consumers.size();
                if (delayBeforeDispatch >= 0) {
                   dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
                }
@@ -1065,33 +1063,33 @@ public void removeConsumer(final Consumer consumer) {
       enterCritical(CRITICAL_CONSUMER);
       try {
          synchronized (this) {
-            consumersChanged = true;
 
-            for (ConsumerHolder holder : consumerList) {
+            boolean consumerRemoved = false;
+            for (ConsumerHolder holder : consumers) {
                if (holder.consumer == consumer) {
                   if (holder.iter != null) {
                      holder.iter.close();
                   }
-                  consumerList.remove(holder);
+                  consumers.remove(holder);
+                  consumerRemoved = true;
                   break;
                }
             }
 
             this.supportsDirectDeliver = checkConsumerDirectDeliver();
 
-            if (pos > 0 && pos >= consumerList.size()) {
-               pos = consumerList.size() - 1;
-            }
-
-            if (consumerSet.remove(consumer)) {
-               int currentConsumerCount = consumersCount.decrementAndGet();
+            if (consumerRemoved) {
                consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
-               boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(currentConsumerCount != 0));
+               boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(consumers.size() != 0));
                if (stopped) {
                   dispatchStartTimeUpdater.set(this, -1);
                }
             }
 
+            if (consumer == exclusiveConsumer) {
+               exclusiveConsumer = null;
+            }
+
             LinkedList<SimpleString> groupsToRemove = null;
 
             for (SimpleString groupID : groups.keySet()) {
@@ -1124,7 +1122,7 @@ public void removeConsumer(final Consumer consumer) {
 
    private boolean checkConsumerDirectDeliver() {
       boolean supports = true;
-      for (ConsumerHolder consumerCheck : consumerList) {
+      for (ConsumerHolder consumerCheck : consumers) {
          if (!consumerCheck.consumer.supportsDirectDelivery()) {
             supports = false;
          }
@@ -1147,7 +1145,7 @@ public synchronized void addRedistributor(final long delay) {
       }
 
       if (delay > 0) {
-         if (consumerSet.isEmpty()) {
+         if (consumers.isEmpty()) {
             DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
 
             redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
@@ -1184,7 +1182,7 @@ protected void finalize() throws Throwable {
 
    @Override
    public int getConsumerCount() {
-      return consumersCount.get();
+      return consumers.size();
    }
 
    @Override
@@ -1193,8 +1191,8 @@ public long getConsumerRemovedTimestamp() {
    }
 
    @Override
-   public synchronized Set<Consumer> getConsumers() {
-      return new HashSet<>(consumerSet);
+   public Set<Consumer> getConsumers() {
+      return this.consumers.stream().map(ConsumerHolder::consumer).collect(Collectors.toSet());
    }
 
    @Override
@@ -1219,7 +1217,7 @@ public synchronized int getGroupCount() {
 
    @Override
    public boolean hasMatchingConsumer(final Message message) {
-      for (ConsumerHolder holder : consumerList) {
+      for (ConsumerHolder holder : consumers) {
          Consumer consumer = holder.consumer;
 
          if (consumer instanceof Redistributor) {
@@ -1367,7 +1365,7 @@ public long getDurableScheduledSize() {
    @Override
    public Map<String, List<MessageReference>> getDeliveringMessages() {
 
-      List<ConsumerHolder> consumerListClone = cloneConsumersList();
+      Collection<ConsumerHolder> consumerListClone = cloneConsumers();
 
       Map<String, List<MessageReference>> mapReturn = new HashMap<>();
 
@@ -1381,6 +1379,19 @@ public long getDurableScheduledSize() {
       return mapReturn;
    }
 
+
+   private Collection<ConsumerHolder> cloneConsumers() {
+      Collection<ConsumerHolder> consumersClone;
+      synchronized (this) {
+         if (redistributor == null) {
+            consumersClone = new ArrayList<>(consumers);
+         } else {
+            consumersClone = Collections.singletonList(redistributor);
+         }
+      }
+      return consumersClone;
+   }
+
    @Override
    public int getDeliveringCount() {
       return deliveringMetrics.getMessageCount();
@@ -1856,7 +1867,7 @@ public void deleteQueue(boolean removeConsumers) throws Exception {
          postOffice.removeBinding(name, tx, true);
 
          if (removeConsumers) {
-            for (ConsumerHolder consumerHolder : consumerList) {
+            for (ConsumerHolder consumerHolder : consumers) {
                consumerHolder.consumer.disconnect();
             }
          }
@@ -2228,7 +2239,7 @@ public synchronized int changeReferencesPriority(final Filter filter, final byte
 
    @Override
    public synchronized void resetAllIterators() {
-      for (ConsumerHolder holder : this.consumerList) {
+      for (ConsumerHolder holder : this.consumers) {
          holder.resetIterator();
       }
       if (redistributor != null) {
@@ -2409,14 +2420,10 @@ private void deliver() {
       // Either the iterator is empty or the consumer is busy
       int noDelivery = 0;
 
-      int size = 0;
-
-      int endPos = -1;
-
       int handled = 0;
 
       long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
-
+      consumers.reset();
       while (true) {
          if (handled == MAX_DELIVERIES_IN_LOOP) {
             // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
@@ -2454,21 +2461,11 @@ private void deliver() {
 
             ConsumerHolder<? extends Consumer> holder;
             if (redistributor == null) {
-
-               if (endPos < 0 || consumersChanged) {
-                  consumersChanged = false;
-
-                  size = consumerList.size();
-
-                  endPos = pos - 1;
-
-                  if (endPos < 0) {
-                     endPos = size - 1;
-                     noDelivery = 0;
-                  }
+               if (consumers.hasNext()) {
+                  holder = consumers.next();
+               } else {
+                  break;
                }
-
-               holder = consumerList.get(pos);
             } else {
                holder = redistributor;
             }
@@ -2497,7 +2494,7 @@ private void deliver() {
 
 
                   handled++;
-
+                  consumers.reset();
                   continue;
                }
 
@@ -2505,37 +2502,28 @@ private void deliver() {
                   logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
                }
 
-               // If a group id is set, then this overrides the consumer chosen round-robin
+               final SimpleString groupID = extractGroupID(ref);
+               groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-               SimpleString groupID = extractGroupID(ref);
-
-               if (groupID != null) {
-                  groupConsumer = groups.get(groupID);
-
-                  if (groupConsumer != null) {
-                     consumer = groupConsumer;
-                  }
-               }
-
-               if (exclusive && redistributor == null) {
-                  consumer = consumerList.get(0).consumer;
+               if (groupConsumer != null) {
+                  consumer = groupConsumer;
                }
 
                HandleStatus status = handle(ref, consumer);
 
                if (status == HandleStatus.HANDLED) {
 
-                  deliveriesInTransit.countUp();
-
-                  handledconsumer = consumer;
-
-                  removeMessageReference(holder, ref);
-
                   if (redistributor == null) {
                      handleMessageGroup(ref, consumer, groupConsumer, groupID);
                   }
 
+                  deliveriesInTransit.countUp();
+
+
+                  removeMessageReference(holder, ref);
+                  handledconsumer = consumer;
                   handled++;
+                  consumers.reset();
                } else if (status == HandleStatus.BUSY) {
                   try {
                      holder.iter.repeat();
@@ -2550,18 +2538,19 @@ private void deliver() {
                   noDelivery++;
                } else if (status == HandleStatus.NO_MATCH) {
                   // nothing to be done on this case, the iterators will just jump next
+                  consumers.reset();
                }
             }
 
-            if (redistributor != null || groupConsumer != null || exclusive) {
+            if (redistributor != null || groupConsumer != null || exclusiveConsumer != null) {
                if (noDelivery > 0) {
                   break;
                }
                noDelivery = 0;
-            } else if (pos == endPos) {
+            } else if (!consumers.hasNext()) {
                // Round robin'd all
 
-               if (noDelivery == size) {
+               if (noDelivery == this.consumers.size()) {
                   if (handledconsumer != null) {
                      // this shouldn't really happen,
                      // however I'm keeping this as an assertion case future developers ever change the logic here on this class
@@ -2576,16 +2565,6 @@ private void deliver() {
 
                noDelivery = 0;
             }
-
-            // Only move onto the next position if the consumer on the current position was used.
-            // When using group we don't need to load balance to the next position
-            if (redistributor == null && !exclusive && groupConsumer == null) {
-               pos++;
-            }
-
-            if (pos >= size) {
-               pos = 0;
-            }
          }
 
          if (handledconsumer != null) {
@@ -2626,7 +2605,6 @@ private SimpleString extractGroupID(MessageReference ref) {
          return null;
       } else {
          try {
-            // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
             return ref.getMessage().getGroupID();
          } catch (Throwable e) {
             ActiveMQServerLogger.LOGGER.unableToExtractGroupID(e);
@@ -2727,15 +2705,13 @@ private void depage(final boolean scheduleExpiry) {
 
    private void internalAddRedistributor(final ArtemisExecutor executor) {
       // create the redistributor only once if there are no local consumers
-      if (consumerSet.isEmpty() && redistributor == null) {
+      if (consumers.isEmpty() && redistributor == null) {
          if (logger.isTraceEnabled()) {
             logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
          }
 
          redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
 
-         consumersChanged = true;
-
          redistributor.consumer.start();
 
          deliverAsync();
@@ -3067,9 +3043,6 @@ private void move(final Transaction originalTX,
    private boolean deliverDirect(final MessageReference ref) {
       synchronized (this) {
          if (!supportsDirectDeliver) {
-            // this should never happen, but who knows?
-            // if someone ever change add and removeConsumer,
-            // this would protect any eventual bug
             return false;
          }
          if (paused || !canDispatch() && redistributor == null) {
@@ -3080,45 +3053,20 @@ private boolean deliverDirect(final MessageReference ref) {
             return true;
          }
 
-         int startPos = pos;
-
-         int size = consumerList.size();
+         consumers.reset();
 
-         while (true) {
-            ConsumerHolder<? extends Consumer> holder;
-            if (redistributor == null) {
-               holder = consumerList.get(pos);
-            } else {
-               holder = redistributor;
-            }
+         while (consumers.hasNext() || redistributor != null) {
 
+            ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
             Consumer consumer = holder.consumer;
 
             Consumer groupConsumer = null;
 
-            // If a group id is set, then this overrides the consumer chosen round-robin
-
-            SimpleString groupID = extractGroupID(ref);
-
-            if (groupID != null) {
-               groupConsumer = groups.get(groupID);
+            final SimpleString groupID = extractGroupID(ref);
+            groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-               if (groupConsumer != null) {
-                  consumer = groupConsumer;
-               }
-            }
-
-            if (exclusive && redistributor == null) {
-               consumer = consumerList.get(0).consumer;
-            }
-
-            // Only move onto the next position if the consumer on the current position was used.
-            if (redistributor == null && !exclusive && groupConsumer == null) {
-               pos++;
-            }
-
-            if (pos == size) {
-               pos = 0;
+            if (groupConsumer != null) {
+               consumer = groupConsumer;
             }
 
             HandleStatus status = handle(ref, consumer);
@@ -3133,25 +3081,46 @@ private boolean deliverDirect(final MessageReference ref) {
 
                deliveriesInTransit.countUp();
                proceedDeliver(consumer, ref);
+               consumers.reset();
                return true;
             }
 
-            if (pos == startPos || redistributor != null || groupConsumer != null || exclusive) {
-               // Tried them all
+            if (redistributor != null || groupConsumer != null || exclusiveConsumer != null) {
                break;
             }
          }
+
+
          return false;
       }
    }
 
-   private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
-      if (groupID != null) {
-         if (extractGroupSequence(ref) == -1) {
-            groups.remove(groupID);
+   private Consumer getGroupConsumer(Consumer groupConsumer, SimpleString groupID) {
+      if (exclusive) {
+         // If exclusive is set, then this overrides the consumer chosen round-robin
+         groupConsumer = exclusiveConsumer;
+      } else {
+         // If a group id is set, then this overrides the consumer chosen round-robin
+         if (groupID != null) {
+            groupConsumer = groups.get(groupID);
          }
+      }
+      return groupConsumer;
+   }
+
+   private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
+      if (exclusive) {
          if (groupConsumer == null) {
-            groups.put(groupID, consumer);
+            exclusiveConsumer = consumer;
+         }
+      } else {
+         if (groupID != null) {
+            if (extractGroupSequence(ref) == -1) {
+               groups.remove(groupID);
+            }
+            if (groupConsumer == null) {
+               groups.put(groupID, consumer);
+            }
          }
       }
    }
@@ -3225,19 +3194,6 @@ private synchronized HandleStatus handle(final MessageReference reference, final
       return status;
    }
 
-   private List<ConsumerHolder> cloneConsumersList() {
-      List<ConsumerHolder> consumerListClone;
-
-      synchronized (this) {
-         if (redistributor == null) {
-            consumerListClone = new ArrayList<>(consumerList);
-         } else {
-            consumerListClone = Collections.singletonList(redistributor);
-         }
-      }
-      return consumerListClone;
-   }
-
    @Override
    public void postAcknowledge(final MessageReference ref) {
       QueueImpl queue = (QueueImpl) ref.getQueue();
@@ -3387,7 +3343,7 @@ public void onError(int errorCode, String errorMessage) {
    // Inner classes
    // --------------------------------------------------------------------------
 
-   protected static class ConsumerHolder<T extends  Consumer> {
+   protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
 
       ConsumerHolder(final T consumer) {
          this.consumer = consumer;
@@ -3404,6 +3360,27 @@ private void resetIterator() {
          iter = null;
       }
 
+      private Consumer consumer() {
+         return consumer;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+         ConsumerHolder<?> that = (ConsumerHolder<?>) o;
+         return Objects.equals(consumer, that.consumer);
+      }
+
+      @Override
+      public int hashCode() {
+         return Objects.hash(consumer);
+      }
+
+      @Override
+      public int getPriority() {
+         return consumer.getPriority();
+      }
    }
 
    private class DelayedAddRedistributor implements Runnable {
@@ -3722,19 +3699,19 @@ public void run() {
             logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
          }
 
-         Set<Consumer> consumersSet = getConsumers();
 
-         if (consumersSet.size() == 0) {
+         if (consumers.size() == 0) {
             logger.debug("There are no consumers, no need to check slow consumer's rate");
             return;
-         } else if (queueRate < (threshold * consumersSet.size())) {
+         } else if (queueRate < (threshold * consumers.size())) {
             if (logger.isDebugEnabled()) {
                logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
             }
             return;
          }
 
-         for (Consumer consumer : consumersSet) {
+         for (ConsumerHolder consumerHolder : consumers) {
+            Consumer consumer = consumerHolder.consumer();
             if (consumer instanceof ServerConsumerImpl) {
                ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
                float consumerRate = serverConsumer.getRate();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
new file mode 100644
index 0000000000..b221235fc7
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Iterator;
+
+public interface ResetableIterator<T> extends Iterator<T> {
+
+   /**
+    * Resets the iterator so you can re-iterate over all elements.
+    *
+    * @return itself, this is just for convenience.
+    */
+   ResetableIterator<T> reset();
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index f7a89d7a66..de97a5b677 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -86,6 +87,8 @@
 
    private final Filter filter;
 
+   private final int priority;
+
    private final int minLargeMessageSize;
 
    private final ServerSession session;
@@ -189,12 +192,32 @@ public ServerConsumerImpl(final long id,
                              final boolean supportLargeMessage,
                              final Integer credits,
                              final ActiveMQServer server) throws Exception {
+      this(id, session, binding, filter, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
+   }
+
+   public ServerConsumerImpl(final long id,
+                             final ServerSession session,
+                             final QueueBinding binding,
+                             final Filter filter,
+                             final int priority,
+                             final boolean started,
+                             final boolean browseOnly,
+                             final StorageManager storageManager,
+                             final SessionCallback callback,
+                             final boolean preAcknowledge,
+                             final boolean strictUpdateDeliveryCount,
+                             final ManagementService managementService,
+                             final boolean supportLargeMessage,
+                             final Integer credits,
+                             final ActiveMQServer server) throws Exception {
       this.id = id;
 
       this.sequentialID = server.getStorageManager().generateID();
 
       this.filter = filter;
 
+      this.priority = priority;
+
       this.session = session;
 
       this.binding = binding;
@@ -479,6 +502,11 @@ public Filter getFilter() {
       return filter;
    }
 
+   @Override
+   public int getPriority() {
+      return priority;
+   }
+
    @Override
    public SimpleString getFilterString() {
       return filter == null ? null : filter.getFilterString();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 3bc60f2084..238ba292eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.Closeable;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -446,6 +447,17 @@ public ServerConsumer createConsumer(final long consumerID,
                                         final boolean browseOnly,
                                         final boolean supportLargeMessage,
                                         final Integer credits) throws Exception {
+      return this.createConsumer(consumerID, queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), browseOnly, true, null);
+   }
+
+   @Override
+   public ServerConsumer createConsumer(final long consumerID,
+                                        final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final int priority,
+                                        final boolean browseOnly,
+                                        final boolean supportLargeMessage,
+                                        final Integer credits) throws Exception {
       final SimpleString unPrefixedQueueName = removePrefix(queueName);
 
       Binding binding = postOffice.getBinding(unPrefixedQueueName);
@@ -476,7 +488,7 @@ public ServerConsumer createConsumer(final long consumerID,
                filterString, browseOnly, supportLargeMessage));
       }
 
-      ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
+      ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
       consumers.put(consumer.getID(), consumer);
 
       if (server.hasBrokerConsumerPlugins()) {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java
new file mode 100644
index 0000000000..33676c4b57
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class QueueConsumersImplTest {
+
+   private QueueConsumers<TestPriority> queueConsumers;
+
+   @Before
+   public void setUp() {
+      queueConsumers = new QueueConsumersImpl<>();
+   }
+
+   @Test
+   public void addTest() {
+      TestPriority testPriority = new TestPriority("hello", 0);
+      assertFalse(queueConsumers.hasNext());
+
+      queueConsumers.add(testPriority);
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+
+      assertEquals(testPriority, queueConsumers.next());
+   }
+
+   @Test
+   public void removeTest() {
+      TestPriority testPriority = new TestPriority("hello", 0);
+      assertFalse(queueConsumers.hasNext());
+
+      queueConsumers.add(testPriority);
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+
+      queueConsumers.remove(testPriority);
+      queueConsumers.reset();
+      assertFalse(queueConsumers.hasNext());
+
+      assertEquals(0, queueConsumers.getPriorites().size());
+      queueConsumers.remove(testPriority);
+      queueConsumers.remove(testPriority);
+
+   }
+
+
+
+   @Test
+   public void roundRobinTest() {
+      queueConsumers.add(new TestPriority("A", 127));
+      queueConsumers.add(new TestPriority("B", 127));
+      queueConsumers.add(new TestPriority("E", 0));
+      queueConsumers.add(new TestPriority("D", 20));
+      queueConsumers.add(new TestPriority("C", 127));
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+
+      assertEquals("A", queueConsumers.next().getName());
+
+      //Reset iterator should mark start as current position
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("B", queueConsumers.next().getName());
+
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("C", queueConsumers.next().getName());
+
+      //Expect another A as after reset, we started at B so after A we then expect the next level
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("A", queueConsumers.next().getName());
+
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("D", queueConsumers.next().getName());
+
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("E", queueConsumers.next().getName());
+
+      //We have iterated all.
+      assertFalse(queueConsumers.hasNext());
+
+      //Reset to iterate again.
+      queueConsumers.reset();
+
+      //We expect the iteration to round robin from last returned at the level.
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("B", queueConsumers.next().getName());
+
+
+   }
+
+
+
+
+
+   private class TestPriority implements PriorityAware {
+
+      private final int priority;
+      private final String name;
+
+      private TestPriority(String name, int priority) {
+         this.priority = priority;
+         this.name = name;
+      }
+
+      @Override
+      public int getPriority() {
+         return priority;
+      }
+
+      public String getName() {
+         return name;
+      }
+   }
+
+}
\ No newline at end of file
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index d86ed8582c..0aa3085155 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -41,6 +41,7 @@
 * [Last-Value Queues](last-value-queues.md)
 * [Exclusive Queues](exclusive-queues.md)
 * [Message Grouping](message-grouping.md)
+* [Consumer Priority](consumer-priority.md)
 * [Extra Acknowledge Modes](pre-acknowledge.md)
 * [Management](management.md)
 * [Management Console](management-console.md)
diff --git a/docs/user-manual/en/consumer-priority.md b/docs/user-manual/en/consumer-priority.md
new file mode 100644
index 0000000000..a7a5ca7dc1
--- /dev/null
+++ b/docs/user-manual/en/consumer-priority.md
@@ -0,0 +1,45 @@
+# Consumer Priority
+
+Consumer priorities allow you to ensure that high priority consumers receive messages while they are active.
+
+Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.
+
+Messages will only going to lower priority consumers when the high priority consumers do not have credit available to consume the message, or those high priority consumers have declined to accept the message (for instance because it does not meet the criteria of any selectors associated with the consumer).
+
+Where a consumer does not set, the default priority <b>0</b> is used.
+
+## Core
+
+#### JMS Example
+
+
+When using the JMS Client you can set the priority to be used, by using address parameters when
+creating the destination used by the consumer.
+
+```java
+Queue queue = session.createQueue("my.destination.name?consmer-priority=50");
+Topic topic = session.createTopic("my.destination.name?consmer-priority=50");
+
+consumer = session.createConsumer(queue);
+```
+
+The range of priority values is -2<sup>31</sup> to 2<sup>31</sup>-1.
+
+## OpenWire
+
+####JMS Example
+
+The priority for a consumer is set using Destination Options as follows:
+
+```java
+queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
+consumer = session.createConsumer(queue);
+```
+
+Because of the limitation of OpenWire, the range of priority values is: 0 to 127. The highest priority is 127.
+
+## AMQP
+
+In AMQP 1.0 the priority of the consumer is set in the properties map of the attach frame where the broker side of the link represents the sending side of the link.
+
+The key for the entry must be the literal string priority, and the value of the entry must be an integral number in the range -2<sup>31</sup> to 2<sup>31</sup>-1.
\ No newline at end of file
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index fb4e4daa04..dc83ef751e 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -79,6 +79,7 @@
    private String selector;
    private boolean presettle;
    private boolean noLocal;
+   private Map<Symbol, Object> properties;
 
    private AsyncResult pullRequest;
    private AsyncResult stopRequest;
@@ -173,6 +174,14 @@ public void run() {
       }
    }
 
+   public void setProperties(Map<Symbol, Object> properties) {
+      if (getEndpoint() != null) {
+         throw new IllegalStateException("Endpoint already established");
+      }
+
+      this.properties = properties;
+   }
+
    /**
     * Detach the receiver, a closed receiver will throw exceptions if any further send calls are
     * made.
@@ -782,7 +791,9 @@ protected void doOpen() {
       } else {
          receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
       }
-
+      if (properties != null) {
+         receiver.setProperties(properties);
+      }
       setEndpoint(receiver);
 
       super.doOpen();
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 8c331cae17..53d45e33b2 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -306,6 +306,12 @@ public AmqpReceiver createReceiver(String address, String selector, boolean noLo
       return createReceiver(address, selector, noLocal, false);
    }
 
+   public AmqpReceiver createReceiver(String address,
+                                      String selector,
+                                      boolean noLocal,
+                                      boolean presettle) throws Exception {
+      return createReceiver(address, selector, noLocal, presettle, null);
+   }
    /**
     * Create a receiver instance using the given address
     *
@@ -313,13 +319,15 @@ public AmqpReceiver createReceiver(String address, String selector, boolean noLo
     * @param selector  the JMS selector to use for the subscription
     * @param noLocal   should the subscription have messages from its connection filtered.
     * @param presettle should the receiver be created with a settled sender mode.
+    * @param properties to set on the receiver
     * @return a newly created receiver that is ready for use.
     * @throws Exception if an error occurs while creating the receiver.
     */
    public AmqpReceiver createReceiver(String address,
                                       String selector,
                                       boolean noLocal,
-                                      boolean presettle) throws Exception {
+                                      boolean presettle,
+                                      Map<Symbol, Object> properties) throws Exception {
       checkClosed();
 
       final ClientFuture request = new ClientFuture();
@@ -330,6 +338,9 @@ public AmqpReceiver createReceiver(String address,
       if (selector != null && !selector.isEmpty()) {
          receiver.setSelector(selector);
       }
+      if (properties != null && !properties.isEmpty()) {
+         receiver.setProperties(properties);
+      }
 
       connection.getScheduler().execute(new Runnable() {
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
new file mode 100644
index 0000000000..e268cc4eb0
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 30000)
+   public void testPriority() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Map<Symbol, Object> properties1 = new HashMap<>();
+      properties1.put(Symbol.getSymbol("priority"), 50);
+      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
+      receiver1.flow(100);
+
+      Map<Symbol, Object> properties2 = new HashMap<>();
+      properties2.put(Symbol.getSymbol("priority"), 10);
+      AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
+      receiver2.flow(100);
+
+      Map<Symbol, Object> properties3 = new HashMap<>();
+      properties3.put(Symbol.getSymbol("priority"), 5);
+      AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
+      receiver3.flow(100);
+
+      sendMessages(getQueueName(), 5);
+
+
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS);
+         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
+         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
+         assertNotNull("did not receive message first time", message1);
+         assertEquals("MessageID:" + i, message1.getMessageId());
+         message1.accept();
+         assertNull("message is not meant to goto lower priority receiver", message2);
+         assertNull("message is not meant to goto lower priority receiver", message3);
+      }
+
+      //Close the high priority receiver
+      receiver1.close();
+
+      sendMessages(getQueueName(), 5);
+
+      //Check messages now goto next priority receiver
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
+         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
+         assertNotNull("did not receive message first time", message2);
+         assertEquals("MessageID:" + i, message2.getMessageId());
+         message2.accept();
+         assertNull("message is not meant to goto lower priority receiver", message3);
+      }
+
+
+      connection.close();
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
new file mode 100644
index 0000000000..9a7a75eee6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.priority.queue");
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, false, false,true);
+   }
+
+
+   protected ConnectionFactory getCF() throws Exception {
+      return cf;
+   }
+
+   @Test
+   public void testConsumerPriorityQueueConsumerSettingUsingAddressQueueParameters() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(queueName.toString());
+         Queue queue1 = session.createQueue(queueName.toString() + "?consumer-priority=3");
+         Queue queue2 = session.createQueue(queueName.toString() + "?consumer-priority=2");
+         Queue queue3 = session.createQueue(queueName.toString() + "?consumer-priority=1");
+
+         assertEquals(queueName.toString(), queue.getQueueName());
+
+         ActiveMQDestination b = (ActiveMQDestination) queue1;
+         assertEquals(Byte.valueOf("3"), b.getQueueAttributes().getConsumerPriority());
+         ActiveMQDestination c = (ActiveMQDestination) queue2;
+         assertEquals(Byte.valueOf("2"), c.getQueueAttributes().getConsumerPriority());
+         ActiveMQDestination d = (ActiveMQDestination) queue3;
+         assertEquals(Byte.valueOf("1"), d.getQueueAttributes().getConsumerPriority());
+
+         MessageProducer producer = session.createProducer(queue);
+
+         MessageConsumer consumer1 = session.createConsumer(queue1);
+         MessageConsumer consumer2 = session.createConsumer(queue2);
+         MessageConsumer consumer3 = session.createConsumer(queue3);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first consumer
+         for (int j = 0; j < 100; j++) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer2.receiveNoWait();
+            assertNull(tm);
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testConsumerPriorityQueueConsumerRoundRobin() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(queueName.toString());
+         Queue queue1 = session.createQueue(queueName.toString() + "?consumer-priority=3");
+         Queue queue2 = session.createQueue(queueName.toString() + "?consumer-priority=3");
+         Queue queue3 = session.createQueue(queueName.toString() + "?consumer-priority=1");
+
+
+         MessageProducer producer = session.createProducer(queue);
+
+         MessageConsumer consumer1 = session.createConsumer(queue1);
+         MessageConsumer consumer2 = session.createConsumer(queue2);
+         MessageConsumer consumer3 = session.createConsumer(queue3);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+            message.setIntProperty("counter", j);
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first two consumers, round robin'd
+         for (int j = 0; j < 50; j += 2) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            TextMessage tm2 = (TextMessage) consumer2.receive(10000);
+            assertNotNull(tm2);
+
+            assertEquals("Message" + (j + 1), tm2.getText());
+
+
+
+            TextMessage tm3 = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm3);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testConsumerPriorityQueueConsumerFailover() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(queueName.toString());
+         Queue queue1 = session.createQueue(queueName.toString() + "?consumer-priority=3");
+         Queue queue2 = session.createQueue(queueName.toString() + "?consumer-priority=2");
+         Queue queue3 = session.createQueue(queueName.toString() + "?consumer-priority=1");
+
+
+         MessageProducer producer = session.createProducer(queue);
+
+         MessageConsumer consumer1 = session.createConsumer(queue1);
+         MessageConsumer consumer2 = session.createConsumer(queue2);
+         MessageConsumer consumer3 = session.createConsumer(queue3);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first consumer
+         for (int j = 0; j < 50; j++) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer2.receiveNoWait();
+            assertNull(tm);
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+         consumer1.close();
+
+         //All msgs should now go to the next consumer only, without any errors or exceptions
+         for (int j = 50; j < 100; j++) {
+            TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testConsumerPriorityTopicSharedConsumerFailover() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+      String topicName = "mytopic";
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Destination topic = session.createTopic(topicName);
+         MessageProducer producer = session.createProducer(topic);
+
+         String subscriptionName = "sharedsub";
+         Topic topicConsumer1 = session.createTopic(topicName + "?consumer-priority=3");
+         Topic topicConsumer2 = session.createTopic(topicName + "?consumer-priority=2");
+         Topic topicConsumer3 = session.createTopic(topicName + "?consumer-priority=1");
+
+
+         MessageConsumer consumer1 = session.createSharedDurableConsumer(topicConsumer1, subscriptionName);
+         MessageConsumer consumer2 = session.createSharedDurableConsumer(topicConsumer2, subscriptionName);
+         MessageConsumer consumer3 = session.createSharedDurableConsumer(topicConsumer3, subscriptionName);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first consumer
+         for (int j = 0; j < 50; j++) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer2.receiveNoWait();
+            assertNull(tm);
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+         consumer1.close();
+
+         //All msgs should now go to the next consumer only, without any errors or exceptions
+         for (int j = 50; j < 100; j++) {
+            TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+
+
+      } finally {
+         connection.close();
+      }
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
new file mode 100644
index 0000000000..5d3b4767df
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, InterruptedException {
+      connection.start();
+      Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(consumerHighPriority);
+      Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      String queueName = "QUEUE.A";
+      ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1");
+      MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low);
+
+      ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2");
+      MessageConsumer highConsumer = consumerLowPriority.createConsumer(high);
+
+      ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+      MessageProducer producer = senderSession.createProducer(senderQueue);
+
+      Message msg = senderSession.createTextMessage("test");
+      for (int i = 0; i < 1000; i++) {
+         producer.send(msg);
+         assertNotNull("null on iteration: " + i, highConsumer.receive(1000));
+      }
+      assertNull(lowConsumer.receive(2000));
+   }
+}
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
index 7a91c21420..ecd0f78fd2 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
@@ -1174,6 +1174,11 @@ public ClientConsumer createConsumer(final SimpleString queueName,
          return null;
       }
 
+      @Override
+      public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, boolean browseOnly) throws ActiveMQException {
+         return null;
+      }
+
       @Override
       public ClientConsumer createConsumer(final SimpleString queueName,
                                            final SimpleString filterString,
@@ -1183,6 +1188,11 @@ public ClientConsumer createConsumer(final SimpleString queueName,
          return null;
       }
 
+      @Override
+      public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, int windowSize, int maxRate, boolean browseOnly) throws ActiveMQException {
+         return null;
+      }
+
       @Override
       public ClientConsumer createConsumer(final String queueName) throws ActiveMQException {
          return null;
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
index a68382f79c..a9a39c131b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
@@ -746,6 +746,11 @@ public SimpleString getFilterString() {
          return null;
       }
 
+      @Override
+      public int getPriority() {
+         return 0;
+      }
+
       public long getID() {
 
          return 0;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services