--- /cygdrive/c/queue2/apache-activemq-5.5.0-fuse-00-43/src/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java 2011-07-10 20:35:04.000000000 -0400 +++ Queue.java 2011-08-02 13:58:29.983355600 -0400 @@ -1796,6 +1796,49 @@ consumersLock.writeLock().unlock(); } + if (isPrioritizedMessages()) { + + // Ensure we have a non-full consumer to dispatch + // to prior to sorting since sorting is a heavy + // operation. Just return the list if no + // non-full consumers exist since we cannot + // dispatch at this point. + boolean notFullConsumer = false; + for(Subscription subscription : consumers) { + if(subscription.isFull() == false) { + notFullConsumer = true; + break; + } + } + if(notFullConsumer == false) { + return list; + } + + // Sort the pending messages using a comparator + Comparator priorityComparator = new Comparator() { + + @Override + public int compare(QueueMessageReference qmr1, QueueMessageReference qmr2) { + + if (qmr1.getMessage().getPriority() < qmr2.getMessage().getPriority()) { + return +1; + } else if (qmr1.getMessage().getPriority() > qmr2.getMessage().getPriority()) { + return -1; + } else { + return 0; + } + } + }; + + long sortStartTime = System.currentTimeMillis(); + Collections.sort(list, priorityComparator); + if (LOG.isDebugEnabled()) { + long sortEndTime = System.currentTimeMillis(); + long sortTime = sortEndTime - sortStartTime; + LOG.debug("Sorted " + list.size() + " messages in " + sortTime + " milliseconds"); + } + } + List rc = new ArrayList(list.size()); Set fullConsumers = new HashSet(this.consumers.size());