[1/6] activemq-artemis git commit: ARTEMIS-1663 - Add new message count and size metrics

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/6] activemq-artemis git commit: ARTEMIS-1663 - Add new message count and size metrics

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 2eac1959d -> 4ef6e3281


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java
new file mode 100644
index 0000000..10620e5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPageCountSizeTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence.metrics;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JournalPageCountSizeTest extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   @Before
+   public void init() throws Exception {
+      server = createServer(true);
+
+      server.start();
+   }
+
+   @Override
+   protected ConfigurationImpl createBasicConfig(int serverID) {
+      return super.createBasicConfig(serverID);
+   }
+
+   @After
+   public void destroy() throws Exception {
+      server.stop();
+   }
+
+   @Test
+   public void testPageCountRecordSize() throws Exception {
+
+      long tx = server.getStorageManager().generateID();
+      server.getStorageManager().storePageCounter(tx, 1, 1, 100);
+      server.getStorageManager().commit(tx);
+      server.getStorageManager().stop();
+
+      JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
+      List<RecordInfo> committedRecords = new LinkedList<>();
+      List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
+
+      try {
+         journalStorageManager.getMessageJournal().start();
+         journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
+
+         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
+         PageCountRecord encoding = new PageCountRecord();
+         encoding.decode(buff);
+
+         Assert.assertEquals(100, encoding.getPersistentSize());
+      } finally {
+         journalStorageManager.getMessageJournal().stop();
+      }
+
+   }
+
+   @Test
+   public void testPageCursorCounterRecordSize() throws Exception {
+
+      server.getStorageManager().storePageCounterInc(1, 1, 1000);
+      server.getStorageManager().stop();
+
+      JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
+      List<RecordInfo> committedRecords = new LinkedList<>();
+      List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
+
+      try {
+         journalStorageManager.getMessageJournal().start();
+         journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
+
+         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
+         PageCountRecordInc encoding = new PageCountRecordInc();
+         encoding.decode(buff);
+
+         Assert.assertEquals(1000, encoding.getPersistentSize());
+      } finally {
+         journalStorageManager.getMessageJournal().stop();
+      }
+
+   }
+
+   @Test
+   public void testPageCursorCounterRecordSizeTX() throws Exception {
+
+      long tx = server.getStorageManager().generateID();
+      server.getStorageManager().storePageCounterInc(tx, 1, 1, 1000);
+      server.getStorageManager().commit(tx);
+      server.getStorageManager().stop();
+
+      JournalStorageManager journalStorageManager = (JournalStorageManager) server.getStorageManager();
+      List<RecordInfo> committedRecords = new LinkedList<>();
+      List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
+
+      try {
+         journalStorageManager.getMessageJournal().start();
+         journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
+
+         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(committedRecords.get(0).data);
+         PageCountRecordInc encoding = new PageCountRecordInc();
+         encoding.decode(buff);
+
+         Assert.assertEquals(1000, encoding.getPersistentSize());
+      } finally {
+         journalStorageManager.getMessageJournal().stop();
+      }
+
+   }
+
+   private TransactionFailureCallback transactionFailure = new TransactionFailureCallback() {
+      @Override
+      public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete) {
+
+      }
+   };
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
new file mode 100644
index 0000000..7bad309
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/JournalPendingMessageTest.java
@@ -0,0 +1,651 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.ToLongFunction;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
+import org.apache.activemq.artemis.junit.Wait.Condition;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.commons.lang.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JournalPendingMessageTest extends AbstractPersistentStatTestSupport {
+   protected static final Logger LOG = LoggerFactory.getLogger(JournalPendingMessageTest.class);
+
+   // protected URI brokerConnectURI;
+   protected String defaultQueueName = "test.queue";
+   protected String defaultTopicName = "test.topic";
+   protected static int maxMessageSize = 1000;
+
+   @Before
+   public void setupAddresses() throws Exception {
+      server.getPostOffice()
+            .addAddressInfo(new AddressInfo(SimpleString.toSimpleString(defaultQueueName), RoutingType.ANYCAST));
+
+      server.createQueue(SimpleString.toSimpleString(defaultQueueName), RoutingType.ANYCAST,
+            SimpleString.toSimpleString(defaultQueueName), null, true, false);
+
+   }
+
+   @Override
+   protected Configuration createDefaultConfig(boolean netty) throws Exception {
+      Configuration config = super.createDefaultConfig(netty);
+
+      // Set a low max size so we page which will test the paging metrics as
+      // well
+      config.setGlobalMaxSize(100000);
+
+      return config;
+   }
+
+   @Test
+   public void testQueueMessageSize() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      publishTestQueueMessages(200, publishedMessageSize);
+
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+
+      this.killServer();
+      this.restartServer();
+
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+   }
+
+   @Test
+   public void testQueueMessageSizeTx() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      publishTestQueueMessagesTx(200, publishedMessageSize);
+
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+
+      this.killServer();
+      this.restartServer();
+
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+   }
+
+   @Test
+   public void testQueueLargeMessageSize() throws Exception {
+
+      ActiveMQConnectionFactory acf = (ActiveMQConnectionFactory) cf;
+      acf.setMinLargeMessageSize(1000);
+      Connection connection = cf.createConnection();
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      String testText = StringUtils.repeat("t", 5000);
+      ActiveMQTextMessage message = (ActiveMQTextMessage) session.createTextMessage(testText);
+      session.createProducer(session.createQueue(defaultQueueName)).send(message);
+
+      verifyPendingStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+      verifyPendingDurableStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+
+      connection.close();
+
+      this.killServer();
+      this.restartServer();
+
+      verifyPendingStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+      verifyPendingDurableStats(defaultQueueName, 1, message.getCoreMessage().getPersistentSize());
+
+   }
+
+   @Test
+   public void testQueueLargeMessageSizeTX() throws Exception {
+
+      ActiveMQConnectionFactory acf = (ActiveMQConnectionFactory) cf;
+      acf.setMinLargeMessageSize(1000);
+      Connection connection = cf.createConnection();
+      connection.start();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      String testText = StringUtils.repeat("t", 2000);
+      MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
+      ActiveMQTextMessage message = (ActiveMQTextMessage) session.createTextMessage(testText);
+      for (int i = 0; i < 10; i++) {
+         producer.send(message);
+      }
+
+      //not commited so should be 0
+      verifyPendingStats(defaultQueueName, 0, message.getCoreMessage().getPersistentSize() * 10);
+      verifyPendingDurableStats(defaultQueueName, 0, message.getCoreMessage().getPersistentSize() * 10);
+
+      session.commit();
+
+      verifyPendingStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize() * 10);
+      verifyPendingDurableStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize() * 10);
+
+      connection.close();
+
+      this.killServer();
+      this.restartServer();
+
+      verifyPendingStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize());
+      verifyPendingDurableStats(defaultQueueName, 10, message.getCoreMessage().getPersistentSize());
+   }
+
+   @Test
+   public void testQueueBrowserMessageSize() throws Exception {
+
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      publishTestQueueMessages(200, publishedMessageSize);
+      browseTestQueueMessages(defaultQueueName);
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+   }
+
+   @Test
+   public void testQueueMessageSizeNonPersistent() throws Exception {
+
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 0, 0);
+   }
+
+   @Test
+   public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
+
+      AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      publishTestQueueMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
+      publishTestQueueMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 100, publishedMessageSize.get());
+   }
+
+   @Test
+   public void testQueueMessageSizeAfterConsumption() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      publishTestQueueMessages(200, publishedMessageSize);
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 200, publishedMessageSize.get());
+
+      consumeTestQueueMessages(200);
+
+      verifyPendingStats(defaultQueueName, 0, 0);
+      verifyPendingDurableStats(defaultQueueName, 0, 0);
+   }
+
+   @Test
+   public void testScheduledStats() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
+      producer.setDeliveryDelay(2000);
+      producer.send(session.createTextMessage("test"));
+
+      verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
+      verifyScheduledStats(defaultQueueName, 1, publishedMessageSize.get());
+
+      consumeTestQueueMessages(1);
+
+      verifyPendingStats(defaultQueueName, 0, 0);
+      verifyPendingDurableStats(defaultQueueName, 0, 0);
+      verifyScheduledStats(defaultQueueName, 0, 0);
+
+      connection.close();
+   }
+
+
+   @Test
+   public void testDeliveringStats() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.start();
+      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
+      producer.send(session.createTextMessage("test"));
+
+      verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
+      verifyDeliveringStats(defaultQueueName, 0, 0);
+
+      MessageConsumer consumer = session.createConsumer(session.createQueue(defaultQueueName));
+      Message msg = consumer.receive();
+      verifyDeliveringStats(defaultQueueName, 1, publishedMessageSize.get());
+      msg.acknowledge();
+
+      verifyPendingStats(defaultQueueName, 0, 0);
+      verifyPendingDurableStats(defaultQueueName, 0, 0);
+      verifyDeliveringStats(defaultQueueName, 0, 0);
+
+      connection.close();
+   }
+   @Test
+   public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+      verifyPendingStats(defaultQueueName, 200, publishedMessageSize.get());
+
+      consumeTestQueueMessages(200);
+
+      verifyPendingStats(defaultQueueName, 0, 0);
+      verifyPendingDurableStats(defaultQueueName, 0, 0);
+   }
+
+   @Test
+   public void testTopicMessageSize() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
+
+      publishTestTopicMessages(200, publishedMessageSize);
+
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultQueueName, 0, 0);
+
+      // consume all messages
+      consumeTestMessages(consumer, 200);
+
+      // All messages should now be gone
+      verifyPendingStats(defaultTopicName, 0, 0);
+      verifyPendingDurableStats(defaultQueueName, 0, 0);
+
+      connection.close();
+   }
+
+   @Test
+   public void testTopicMessageSizeShared() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createSharedConsumer(session.createTopic(defaultTopicName), "sub1");
+      MessageConsumer consumer2 = session.createSharedConsumer(session.createTopic(defaultTopicName), "sub1");
+
+      publishTestTopicMessages(200, publishedMessageSize);
+
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 0, 0);
+      consumer2.close();
+
+      // consume all messages
+      consumeTestMessages(consumer, 200);
+
+      // All messages should now be gone
+      verifyPendingStats(defaultTopicName, 0, 0);
+      verifyPendingDurableStats(defaultTopicName, 0, 0);
+
+      connection.close();
+   }
+
+   @Test
+   public void testTopicNonPersistentMessageSize() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
+
+      publishTestTopicMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+
+      // consume all messages
+      consumeTestMessages(consumer, 200);
+
+      // All messages should now be gone
+      verifyPendingStats(defaultTopicName, 0, 0);
+
+      connection.close();
+   }
+
+   @Test
+   public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+      AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
+
+      publishTestTopicMessages(100, DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
+      publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
+
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
+
+      // consume all messages
+      consumeTestMessages(consumer, 200);
+
+      // All messages should now be gone
+      verifyPendingStats(defaultTopicName, 0, 0);
+
+      connection.close();
+   }
+
+   @Test
+   public void testMessageSizeOneDurable() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+
+      publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, publishedMessageSize,
+            DeliveryMode.PERSISTENT, false);
+
+      // verify the count and size - durable is offline so all 200 should be
+      // pending since none are in prefetch
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+
+      // consume all messages
+      consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+      // All messages should now be gone
+      verifyPendingStats(defaultTopicName, 0, 0);
+      verifyPendingDurableStats(defaultTopicName, 0, 0);
+
+      connection.close();
+   }
+
+   @Test
+   public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+
+      publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, publishedMessageSize,
+            DeliveryMode.PERSISTENT, false);
+
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+
+      // consume partial messages
+      consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+      // 150 should be left
+      verifyPendingStats(defaultTopicName, 150, publishedMessageSize.get());
+      // We don't really know the size here but it should be smaller than before
+      // so take an average
+      verifyPendingDurableStats(defaultTopicName, 150, (long) (.75 * publishedMessageSize.get()));
+
+      connection.close();
+   }
+
+   @Test
+   public void testMessageSizeTwoDurables() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+
+      publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize,
+            DeliveryMode.PERSISTENT, false);
+
+      // verify the count and size - double because two durables so two queue
+      // bindings
+      verifyPendingStats(defaultTopicName, 400, 2 * publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 400, 2 * publishedMessageSize.get());
+
+      // consume messages just for sub1
+      consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+      // There is still a durable that hasn't consumed so the messages should
+      // exist
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+
+      connection.close();
+
+      // restart and verify load
+      this.killServer();
+      this.restartServer();
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+   }
+
+   @Test
+   public void testMessageSizeSharedDurable() throws Exception {
+      AtomicLong publishedMessageSize = new AtomicLong();
+
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId");
+      connection.start();
+
+      // The publish method will create a second shared consumer
+      Session s = connection.createSession();
+      MessageConsumer c = s.createSharedDurableConsumer(s.createTopic(defaultTopicName), "sub1");
+      publishTestMessagesDurable(connection, new String[] {"sub1",}, 200, publishedMessageSize,
+            DeliveryMode.PERSISTENT, true);
+
+      // verify the count and size - double because two durables so two queue
+      // bindings
+      verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
+      c.close();
+
+      // consume messages for sub1
+      consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+      verifyPendingStats(defaultTopicName, 0, publishedMessageSize.get());
+      verifyPendingDurableStats(defaultTopicName, 0, publishedMessageSize.get());
+
+      connection.close();
+   }
+
+   protected List<Queue> getQueues(final String address) throws Exception {
+      final List<Queue> queues = new ArrayList<>();
+      for (Binding binding : server.getPostOffice().getDirectBindings(SimpleString.toSimpleString(address))
+            .getBindings()) {
+         if (binding.getType() == BindingType.LOCAL_QUEUE) {
+            LocalQueueBinding queueBinding = (LocalQueueBinding) binding;
+            queues.add(queueBinding.getQueue());
+         }
+      }
+      return queues;
+   }
+
+   protected void verifyDeliveringStats(final String address, final int count, final long minimumSize) throws Exception {
+      verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDeliveringCount,
+            org.apache.activemq.artemis.core.server.Queue::getDeliveringSize);
+      verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableDeliveringCount,
+            org.apache.activemq.artemis.core.server.Queue::getDurableDeliveringSize);
+   }
+
+
+   protected void verifyScheduledStats(final String address, final int count, final long minimumSize) throws Exception {
+      verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getScheduledCount,
+            org.apache.activemq.artemis.core.server.Queue::getScheduledSize);
+      verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableScheduledCount,
+            org.apache.activemq.artemis.core.server.Queue::getDurableScheduledSize);
+   }
+
+   protected void verifyPendingStats(final String address, final int count, final long minimumSize) throws Exception {
+      verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getMessageCount,
+            org.apache.activemq.artemis.core.server.Queue::getPersistentSize);
+   }
+
+   protected void verifyPendingDurableStats(final String address, final int count, final long minimumSize)
+         throws Exception {
+
+      verifyStats(address, count, minimumSize, org.apache.activemq.artemis.core.server.Queue::getDurableMessageCount,
+            org.apache.activemq.artemis.core.server.Queue::getDurablePersistentSize);
+   }
+
+   protected void verifyStats(final String address, final int count, final long minimumSize,
+         ToLongFunction<Queue> countFunc, ToLongFunction<Queue> sizeFunc)
+         throws Exception {
+      final List<Queue> queues = getQueues(address);
+
+      assertTrue(Wait.waitFor(new Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return queues.stream().mapToLong(countFunc)
+                  .sum() == count;
+         }
+      }));
+
+      verifySize(count, new MessageSizeCalculator() {
+         @Override
+         public long getMessageSize() throws Exception {
+            return queues.stream().mapToLong(sizeFunc)
+                  .sum();
+         }
+      }, minimumSize);
+
+   }
+
+   protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator, final long minimumSize)
+         throws Exception {
+      if (count > 0) {
+         assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return messageSizeCalculator.getMessageSize() > minimumSize;
+            }
+         }));
+      } else {
+         assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+               return messageSizeCalculator.getMessageSize() == 0;
+            }
+         }));
+      }
+   }
+
+   protected interface MessageSizeCalculator {
+      long getMessageSize() throws Exception;
+   }
+
+   protected void consumeTestMessages(MessageConsumer consumer, int size) throws Exception {
+      consumeTestMessages(consumer, size, defaultTopicName);
+   }
+
+   protected void consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception {
+      for (int i = 0; i < size; i++) {
+         consumer.receive();
+      }
+   }
+
+   protected void consumeDurableTestMessages(Connection connection, String sub, int size,
+         AtomicLong publishedMessageSize) throws Exception {
+      consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
+   }
+
+   protected void publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize,
+         AtomicLong publishedMessageSize, int deliveryMode, boolean shared) throws Exception {
+
+      publishTestMessagesDurable(connection, subNames, defaultTopicName, publishSize, 0,
+            AbstractPersistentStatTestSupport.defaultMessageSize, publishedMessageSize, false, deliveryMode, shared);
+   }
+
+   protected void publishTestTopicMessages(int publishSize, AtomicLong publishedMessageSize) throws Exception {
+      publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize);
+   }
+
+   protected void publishTestTopicMessages(int publishSize, int deliveryMode, AtomicLong publishedMessageSize)
+         throws Exception {
+      // create a new queue
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId2");
+      connection.start();
+
+      // Start the connection
+      Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(defaultTopicName);
+
+      try {
+         // publish a bunch of non-persistent messages to fill up the temp
+         // store
+         MessageProducer prod = session.createProducer(topic);
+         prod.setDeliveryMode(deliveryMode);
+         for (int i = 0; i < publishSize; i++) {
+            prod.send(createMessage(i, session, JournalPendingMessageTest.maxMessageSize, publishedMessageSize));
+         }
+
+      } finally {
+         connection.close();
+      }
+   }
+
+   protected void publishTestQueueMessagesTx(int count, AtomicLong publishedMessageSize) throws Exception {
+      publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+            JournalPendingMessageTest.maxMessageSize, publishedMessageSize, true);
+   }
+
+   protected void publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
+      publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+            JournalPendingMessageTest.maxMessageSize, publishedMessageSize, false);
+   }
+
+   protected void publishTestQueueMessages(int count, int deliveryMode, AtomicLong publishedMessageSize)
+         throws Exception {
+      publishTestQueueMessages(count, defaultQueueName, deliveryMode, JournalPendingMessageTest.maxMessageSize,
+            publishedMessageSize, false);
+   }
+
+   protected void consumeTestQueueMessages(int num) throws Exception {
+      consumeTestQueueMessages(defaultQueueName, num);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 6e66057..25300d3c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -362,6 +362,21 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
       return messageCount;
    }
 
+   @Override
+   public long getPersistentSize() {
+      return 0;
+   }
+
+   @Override
+   public long getDurableMessageCount() {
+      return 0;
+   }
+
+   @Override
+   public long getDurablePersistentSize() {
+      return 0;
+   }
+
    public void setMessageCount(long messageCount) {
       this.messageCount = messageCount;
    }
@@ -453,6 +468,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public long getScheduledSize() {
+      // no-op
+      return 0;
+   }
+
+   @Override
    public List<MessageReference> getScheduledMessages() {
       // no-op
       return null;
@@ -522,7 +543,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void referenceHandled() {
+   public void referenceHandled(MessageReference ref) {
       // no-op
 
    }
@@ -684,6 +705,28 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void decDelivering(int size) {
+   public long getDeliveringSize() {
+      return 0;
    }
+
+   @Override
+   public int getDurableDeliveringCount() {
+      return 0;
+   }
+
+   @Override
+   public long getDurableDeliveringSize() {
+      return 0;
+   }
+
+   @Override
+   public int getDurableScheduledCount() {
+      return 0;
+   }
+
+   @Override
+   public long getDurableScheduledSize() {
+      return 0;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 1db8347..2a5a330 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -105,7 +105,7 @@ public class FakeConsumer implements Consumer {
          if (filter != null) {
             if (filter.match(reference.getMessage())) {
                references.addLast(reference);
-               reference.getQueue().referenceHandled();
+               reference.getQueue().referenceHandled(reference);
                notify();
 
                return HandleStatus.HANDLED;
@@ -125,7 +125,7 @@ public class FakeConsumer implements Consumer {
          }
 
          if (statusToReturn == HandleStatus.HANDLED) {
-            reference.getQueue().referenceHandled();
+            reference.getQueue().referenceHandled(reference);
             references.addLast(reference);
             notify();
          }

Reply | Threaded
Open this post in threaded view
|

[2/6] activemq-artemis git commit: ARTEMIS-1663 - Add new message count and size metrics

clebertsuconic-2
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 9c9e6f5..ebea686 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.management;
 import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
 
 import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -27,6 +29,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import javax.json.JsonArray;
 import javax.json.JsonObject;
@@ -60,27 +63,46 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.junit.Wait;
 import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(value = Parameterized.class)
 public class QueueControlTest extends ManagementTestBase {
 
    private ActiveMQServer server;
    private ClientSession session;
    private ServerLocator locator;
+   private final boolean durable;
+
+   @Parameterized.Parameters(name = "durable={0}")
+   public static Collection<Object[]> getParams() {
+      return Arrays.asList(new Object[][] {{true}, {false}});
+   }
+
+
+   /**
+    * @param durable
+    */
+   public QueueControlTest(boolean durable) {
+      super();
+      this.durable = durable;
+   }
+
 
    @Test
    public void testAttributes() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       SimpleString filter = new SimpleString("color = 'blue'");
-      boolean durable = RandomUtil.randomBoolean();
 
-      session.createQueue(address, queue, filter, durable);
+      session.createQueue(address, RoutingType.MULTICAST, queue, filter, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(queue.toString(), queueControl.getName());
@@ -97,7 +119,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(queue.toString(), queueControl.getName());
@@ -112,7 +134,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       final SimpleString deadLetterAddress = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertNull(queueControl.getDeadLetterAddress());
@@ -137,7 +159,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       String deadLetterAddress = RandomUtil.randomString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -155,7 +177,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       final SimpleString expiryAddress = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertNull(queueControl.getExpiryAddress());
@@ -180,7 +202,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       String expiryAddress = RandomUtil.randomString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -200,7 +222,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -220,7 +242,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -250,18 +272,18 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessageCount(queueControl));
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      producer.send(session.createMessage(durable));
+      assertMessageMetrics(queueControl, 1, durable);
 
       consumeMessages(1, session, queue);
 
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -271,7 +293,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessageCount(queueControl));
@@ -302,15 +324,15 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessagesAdded(queueControl));
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(1, getMessagesAdded(queueControl));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       consumeMessages(2, session, queue);
@@ -325,7 +347,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, false);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
@@ -351,13 +373,13 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getScheduledCount());
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
       producer.send(message);
 
@@ -366,13 +388,17 @@ public class QueueControlTest extends ManagementTestBase {
          Thread.sleep(100);
       }
 
-      Assert.assertEquals(1, queueControl.getScheduledCount());
+      assertScheduledMetrics(queueControl, 1, durable);
+      assertMessageMetrics(queueControl, 1, durable);
+
       consumeMessages(0, session, queue);
 
       Thread.sleep(delay * 2);
 
       Assert.assertEquals(0, queueControl.getScheduledCount());
       consumeMessages(1, session, queue);
+      assertMessageMetrics(queueControl, 0, durable);
+      assertScheduledMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -395,14 +421,14 @@ public class QueueControlTest extends ManagementTestBase {
          SimpleString address = RandomUtil.randomSimpleString();
          queue = RandomUtil.randomSimpleString();
 
-         transSession.createQueue(address, queue, null, false);
+         transSession.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
          final QueueControl queueControl = createManagementControl(address, queue);
 
          ClientProducer producer = transSession.createProducer(address);
 
          for (int i = 0; i < numMsg; i++) {
-            ClientMessage message = transSession.createMessage(false);
+            ClientMessage message = transSession.createMessage(durable);
             message.putIntProperty(new SimpleString("seqno"), i);
             producer.send(message);
          }
@@ -480,17 +506,17 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       Queue srvqueue = server.locateQueue(queue);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       ClientConsumer consumer = session.createConsumer(queue);
       session.start();
@@ -525,20 +551,22 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + delay);
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
       // unscheduled message
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Map<String, Object>[] messages = queueControl.listScheduledMessages();
       Assert.assertEquals(1, messages.length);
+      assertScheduledMetrics(queueControl, 1, durable);
+
       Assert.assertEquals(intValue, Integer.parseInt((messages[0].get("key")).toString()));
 
       Thread.sleep(delay + 500);
@@ -557,7 +585,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -567,7 +595,7 @@ public class QueueControlTest extends ManagementTestBase {
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
       // unscheduled message
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       String jsonString = queueControl.listScheduledMessagesAsJSON();
       Assert.assertNotNull(jsonString);
@@ -593,10 +621,10 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getDeliveringCount());
@@ -604,11 +632,11 @@ public class QueueControlTest extends ManagementTestBase {
       ClientConsumer consumer = session.createConsumer(queue);
       ClientMessage message = consumer.receive(500);
       Assert.assertNotNull(message);
-      Assert.assertEquals(1, queueControl.getDeliveringCount());
+      assertDeliveringMetrics(queueControl, 1, durable);
 
       message.acknowledge();
       session.commit();
-      Assert.assertEquals(0, queueControl.getDeliveringCount());
+      assertDeliveringMetrics(queueControl, 0, durable);
 
       consumer.close();
       session.deleteQueue(queue);
@@ -629,7 +657,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
 
       try {
-         session.createQueue(address, RoutingType.ANYCAST, queue, null, false);
+         session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
          for (int i = 0; i < THREAD_COUNT; i++) {
             producerExecutor.submit(() -> {
@@ -665,7 +693,8 @@ public class QueueControlTest extends ManagementTestBase {
          producerCountDown.await(30, TimeUnit.SECONDS);
          consumerCountDown.await(30, TimeUnit.SECONDS);
 
-         QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
+         QueueControl queueControl = createManagementControl(address, queue, RoutingType.MULTICAST);
+         Thread.sleep(200);
          Assert.assertEquals(0, queueControl.getMessageCount());
          Assert.assertEquals(0, queueControl.getConsumerCount());
          Assert.assertEquals(0, queueControl.getDeliveringCount());
@@ -694,12 +723,12 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
 
@@ -731,17 +760,18 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
+      assertMessageMetrics(queueControl, 2, durable);
       Map<String, Object>[] messages = queueControl.listMessages(filter);
       Assert.assertEquals(1, messages.length);
       Assert.assertEquals(matchingValue, Long.parseLong(messages[0].get("key").toString()));
@@ -750,6 +780,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       messages = queueControl.listMessages(filter);
       Assert.assertEquals(0, messages.length);
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -759,12 +790,12 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       Map<String, Object>[] messages = queueControl.listMessages(null);
       Assert.assertEquals(2, messages.length);
@@ -782,12 +813,12 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       Map<String, Object>[] messages = queueControl.listMessages("");
       Assert.assertEquals(2, messages.length);
@@ -810,14 +841,14 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
@@ -853,8 +884,8 @@ public class QueueControlTest extends ManagementTestBase {
       AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
       server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
 
-      session.createQueue(dla, dlq, null, false);
-      session.createQueue(adName, qName, null, false);
+      session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable);
+      session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
 
       // Send message to queue.
       ClientProducer producer = session.createProducer(adName);
@@ -874,7 +905,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertNull(clientMessage);
 
       QueueControl queueControl = createManagementControl(dla, dlq);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
       final long messageID = getFirstMessageId(queueControl);
 
       // Retry the message - i.e. it should go from DLQ to original Queue.
@@ -882,6 +913,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       // Assert DLQ is empty...
       Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       // .. and that the message is now on the original queue once more.
       clientMessage = clientConsumer.receive(500);
@@ -909,8 +941,8 @@ public class QueueControlTest extends ManagementTestBase {
       server.getAddressSettingsRepository().addMatch(forwardingAddress.toString(), addressSettings);
 
       // create target queue, DLQ and source topic
-      session.createQueue(dla, RoutingType.ANYCAST, dlq, null, false);
-      session.createQueue(forwardingAddress, RoutingType.ANYCAST, forwardingQueue, null, false);
+      session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable);
+      session.createQueue(forwardingAddress, RoutingType.MULTICAST, forwardingQueue, null, durable);
       session.createAddress(myTopic, RoutingType.MULTICAST, false);
 
       DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
@@ -935,15 +967,16 @@ public class QueueControlTest extends ManagementTestBase {
       clientMessage = clientConsumer.receiveImmediate();
       Assert.assertNull(clientMessage);
 
-      QueueControl queueControl = createManagementControl(dla, dlq, RoutingType.ANYCAST);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      QueueControl queueControl = createManagementControl(dla, dlq, RoutingType.MULTICAST);
+      assertMessageMetrics(queueControl, 1, durable);
+
       final long messageID = getFirstMessageId(queueControl);
 
       // Retry the message - i.e. it should go from DLQ to original Queue.
       Assert.assertTrue(queueControl.retryMessage(messageID));
 
       // Assert DLQ is empty...
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       // .. and that the message is now on the original queue once more.
       clientMessage = clientConsumer.receive(500);
@@ -970,8 +1003,8 @@ public class QueueControlTest extends ManagementTestBase {
       AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
       server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
 
-      session.createQueue(dla, dlq, null, false);
-      session.createQueue(adName, qName, null, false);
+      session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable);
+      session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
 
       // Send message to queue.
       ClientProducer producer = session.createProducer(adName);
@@ -1014,13 +1047,13 @@ public class QueueControlTest extends ManagementTestBase {
       assertTrue(queueMemorySize2.get() > 0);
 
       QueueControl dlqQueueControl = createManagementControl(dla, dlq);
-      Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
+      assertMessageMetrics(dlqQueueControl, numMessagesToTest, durable);
 
       // Retry all messages - i.e. they should go from DLQ to original Queue.
       Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages());
 
       // Assert DLQ is empty...
-      Assert.assertEquals(0, getMessageCount(dlqQueueControl));
+      assertMessageMetrics(dlqQueueControl, 0, durable);
 
       //Verify that original queue has a memory size of greater than 0 and DLQ is 0 after move
       assertTrue(queueMemorySize1.get() > 0);
@@ -1056,12 +1089,12 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString otherAddress = RandomUtil.randomSimpleString();
       SimpleString otherQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(otherAddress, otherQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+      session.createQueue(otherAddress, RoutingType.MULTICAST, otherQueue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       SimpleString key = RandomUtil.randomSimpleString();
       long value = RandomUtil.randomLong();
       message.putLongProperty(key, value);
@@ -1076,7 +1109,7 @@ public class QueueControlTest extends ManagementTestBase {
       AtomicInteger queueMemorySize = (AtomicInteger) queueMemorySizeField.get(q);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       //verify memory usage is greater than 0
       Assert.assertTrue(queueMemorySize.get() > 0);
@@ -1084,7 +1117,7 @@ public class QueueControlTest extends ManagementTestBase {
       // moved all messages to otherQueue
       int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString());
       Assert.assertEquals(1, movedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       //verify memory usage is 0 after move
       Assert.assertEquals(0, queueMemorySize.get());
@@ -1111,9 +1144,9 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queueB = new SimpleString("B");
       SimpleString queueC = new SimpleString("C");
 
-      server.createQueue(address, RoutingType.MULTICAST, queueA, null, true, false);
-      server.createQueue(address, RoutingType.MULTICAST, queueB, null, true, false);
-      server.createQueue(address, RoutingType.MULTICAST, queueC, null, true, false);
+      server.createQueue(address, RoutingType.MULTICAST, queueA, null, durable, false);
+      server.createQueue(address, RoutingType.MULTICAST, queueB, null, durable, false);
+      server.createQueue(address, RoutingType.MULTICAST, queueC, null, durable, false);
 
 
       QueueControl queueControlA = createManagementControl(address, queueA);
@@ -1174,18 +1207,18 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       SimpleString unknownQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       SimpleString key = RandomUtil.randomSimpleString();
       long value = RandomUtil.randomLong();
       message.putLongProperty(key, value);
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // moved all messages to unknown queue
       try {
@@ -1194,6 +1227,7 @@ public class QueueControlTest extends ManagementTestBase {
       } catch (Exception e) {
       }
       Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       consumeMessages(1, session, queue);
 
@@ -1208,7 +1242,6 @@ public class QueueControlTest extends ManagementTestBase {
     * <li>consume the message which <strong>did</strong> matches the filter from otherQueue</li>
     * </ol>
     */
-
    @Test
    public void testMoveMessagesWithFilter() throws Exception {
       SimpleString key = new SimpleString("key");
@@ -1220,20 +1253,20 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString otherAddress = RandomUtil.randomSimpleString();
       SimpleString otherQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(otherAddress, otherQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+      session.createQueue(otherAddress, RoutingType.MULTICAST,otherQueue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // moved matching messages to otherQueue
       int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue, otherQueue.toString());
@@ -1267,18 +1300,18 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString otherAddress = RandomUtil.randomSimpleString();
       SimpleString otherQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(otherAddress, otherQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+      session.createQueue(otherAddress, RoutingType.MULTICAST, otherQueue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
-      Assert.assertEquals(0, getMessageCount(otherQueueControl));
+      assertMessageMetrics(queueControl, 2, durable);
+      assertMessageMetrics(otherQueueControl, 0, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1287,8 +1320,8 @@ public class QueueControlTest extends ManagementTestBase {
 
       boolean moved = queueControl.moveMessage(messageID, otherQueue.toString());
       Assert.assertTrue(moved);
-      Assert.assertEquals(1, getMessageCount(queueControl));
-      Assert.assertEquals(1, getMessageCount(otherQueueControl));
+      assertMessageMetrics(queueControl, 1, durable);
+      assertMessageMetrics(otherQueueControl, 1, durable);
 
       consumeMessages(1, session, queue);
       consumeMessages(1, session, otherQueue);
@@ -1303,14 +1336,14 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       SimpleString unknownQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1347,19 +1380,19 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(key + " =" + matchingValue);
@@ -1391,24 +1424,24 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(5, key + " =" + matchingValue);
       Assert.assertEquals(1, removedMatchedMessagesCount);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -1431,20 +1464,20 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(null);
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -1454,20 +1487,20 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeAllMessages();
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -1477,20 +1510,20 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages("");
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -1500,15 +1533,15 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1518,7 +1551,7 @@ public class QueueControlTest extends ManagementTestBase {
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // check there is a single message to consume from queue
       consumeMessages(1, session, queue);
@@ -1531,15 +1564,15 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue, both scheduled
       long timeout = System.currentTimeMillis() + 5000;
-      ClientMessage m1 = session.createMessage(true);
+      ClientMessage m1 = session.createMessage(durable);
       m1.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
       producer.send(m1);
-      ClientMessage m2 = session.createMessage(true);
+      ClientMessage m2 = session.createMessage(durable);
       m2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
       producer.send(m2);
 
@@ -1554,7 +1587,7 @@ public class QueueControlTest extends ManagementTestBase {
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(1, queueControl.getScheduledCount());
+      assertScheduledMetrics(queueControl, 1, durable);
 
       // check there is a single message to consume from queue
       while (timeout > System.currentTimeMillis() && queueControl.getScheduledCount() == 1) {
@@ -1571,14 +1604,14 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send messages on queue
 
       for (int i = 0; i < 100; i++) {
 
-         ClientMessage msg = session.createMessage(false);
+         ClientMessage msg = session.createMessage(durable);
          msg.putIntProperty("count", i);
          producer.send(msg);
       }
@@ -1592,7 +1625,7 @@ public class QueueControlTest extends ManagementTestBase {
       }
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(100, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 100, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1604,7 +1637,7 @@ public class QueueControlTest extends ManagementTestBase {
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(99, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 99, durable);
 
       cons.close();
 
@@ -1623,13 +1656,13 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(matchingMessage);
       producer.send(unmatchingMessage);
@@ -1637,6 +1670,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(3, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 3, durable);
 
       Assert.assertEquals(2, queueControl.countMessages(key + " =" + matchingValue));
       Assert.assertEquals(1, queueControl.countMessages(key + " =" + unmatchingValue));
@@ -1653,18 +1687,18 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
       for (int i = 0; i < 100; i++) {
-         ClientMessage msg = session.createMessage(false);
+         ClientMessage msg = session.createMessage(durable);
          msg.putStringProperty(key, SimpleString.toSimpleString(matchingValue));
          producer.send(msg);
       }
 
       for (int i = 0; i < 10; i++) {
-         ClientMessage msg = session.createMessage(false);
+         ClientMessage msg = session.createMessage(durable);
          msg.putStringProperty(key, SimpleString.toSimpleString(nonMatchingValue));
          producer.send(msg);
       }
@@ -1679,7 +1713,7 @@ public class QueueControlTest extends ManagementTestBase {
       assertNull(consumer.receiveImmediate());
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(110, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 110, durable);
 
       Assert.assertEquals(0, queueControl.countMessages("nonExistentProperty like \'%Temp/88\'"));
 
@@ -1700,14 +1734,14 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
@@ -1716,7 +1750,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       int expiredMessagesCount = queueControl.expireMessages(key + " =" + matchingValue);
       Assert.assertEquals(1, expiredMessagesCount);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -1742,17 +1776,17 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString expiryAddress = RandomUtil.randomSimpleString();
       SimpleString expiryQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(expiryAddress, expiryQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+      session.createQueue(expiryAddress, RoutingType.MULTICAST, expiryQueue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl expiryQueueControl = createManagementControl(expiryAddress, expiryQueue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
-      Assert.assertEquals(0, getMessageCount(expiryQueueControl));
+      assertMessageMetrics(queueControl, 1, durable);
+      assertMessageMetrics(expiryQueueControl, 0, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1764,8 +1798,9 @@ public class QueueControlTest extends ManagementTestBase {
 
       boolean expired = queueControl.expireMessage(messageID);
       Assert.assertTrue(expired);
-      Assert.assertEquals(0, getMessageCount(queueControl));
-      Assert.assertEquals(1, getMessageCount(expiryQueueControl));
+      Thread.sleep(200);
+      assertMessageMetrics(queueControl, 0, durable);
+      assertMessageMetrics(expiryQueueControl, 1, durable);
 
       consumeMessages(0, session, queue);
       consumeMessages(1, session, expiryQueue);
@@ -1782,17 +1817,17 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString deadLetterAddress = RandomUtil.randomSimpleString();
       SimpleString deadLetterQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(deadLetterAddress, deadLetterQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+      session.createQueue(deadLetterAddress, RoutingType.MULTICAST, deadLetterQueue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl deadLetterQueueControl = createManagementControl(deadLetterAddress, deadLetterQueue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1805,8 +1840,9 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(0, getMessageCount(deadLetterQueueControl));
       boolean movedToDeadLetterAddress = queueControl.sendMessageToDeadLetterAddress(messageID);
       Assert.assertTrue(movedToDeadLetterAddress);
-      Assert.assertEquals(1, getMessageCount(queueControl));
-      Assert.assertEquals(1, getMessageCount(deadLetterQueueControl));
+      assertMessageMetrics(queueControl, 1, durable);
+      Thread.sleep(200);
+      assertMessageMetrics(deadLetterQueueControl, 1, durable);
 
       // check there is a single message to consume from queue
       consumeMessages(1, session, queue);
@@ -1826,10 +1862,10 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.setPriority(originalPriority);
       producer.send(message);
 
@@ -1860,10 +1896,10 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
@@ -1894,7 +1930,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -1908,7 +1944,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(0, info.getCount());
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Thread.sleep(200);
       jsonString = queueControl.listMessageCounter();
@@ -1918,7 +1954,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(1, info.getCount());
       Assert.assertEquals(1, info.getCountDelta());
 
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Thread.sleep(200);
       jsonString = queueControl.listMessageCounter();
@@ -1946,7 +1982,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -1960,7 +1996,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(0, info.getCount());
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
       jsonString = queueControl.listMessageCounter();
@@ -1998,7 +2034,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       String history = queueControl.listMessageCounterAsHTML();
@@ -2013,7 +2049,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -2033,7 +2069,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -2049,8 +2085,8 @@ public class QueueControlTest extends ManagementTestBase {
 
    @Test
    public void testMoveMessagesBack() throws Exception {
-      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
-      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);
+      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, durable, false);
+      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, durable, false);
 
       ServerLocator locator = createInVMNonHALocator();
 
@@ -2061,7 +2097,7 @@ public class QueueControlTest extends ManagementTestBase {
       ClientProducer prod1 = session.createProducer("q1");
 
       for (int i = 0; i < 10; i++) {
-         ClientMessage msg = session.createMessage(true);
+         ClientMessage msg = session.createMessage(durable);
 
          msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
 
@@ -2113,8 +2149,8 @@ public class QueueControlTest extends ManagementTestBase {
 
    @Test
    public void testMoveMessagesBack2() throws Exception {
-      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, true, false);
-      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, true, false);
+      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new SimpleString("q1"), null, durable, false);
+      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new SimpleString("q2"), null, durable, false);
 
       ServerLocator locator = createInVMNonHALocator();
 
@@ -2127,7 +2163,7 @@ public class QueueControlTest extends ManagementTestBase {
       int NUMBER_OF_MSGS = 10;
 
       for (int i = 0; i < NUMBER_OF_MSGS; i++) {
-         ClientMessage msg = session.createMessage(true);
+         ClientMessage msg = session.createMessage(durable);
 
          msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
 
@@ -2191,7 +2227,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
 
       try {
-         session.createQueue(address, queue, null, false);
+         session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
          QueueControl queueControl = createManagementControl(address, queue);
 
          ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -2203,7 +2239,6 @@ public class QueueControlTest extends ManagementTestBase {
          queueControl.resume();
          Assert.assertFalse(queueControl.isPaused());
       } catch (Exception e) {
-         // TODO Auto-generated catch block
          e.printStackTrace();
       }
    }
@@ -2213,15 +2248,15 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessagesAdded(queueControl));
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(1, getMessagesAdded(queueControl));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       consumeMessages(2, session, queue);
@@ -2240,16 +2275,16 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       consumeMessages(1, session, queue);
       Assert.assertEquals(1, queueControl.getMessagesAcknowledged());
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       consumeMessages(1, session, queue);
       Assert.assertEquals(2, queueControl.getMessagesAcknowledged());
 
@@ -2265,13 +2300,13 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesExpired());
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       producer.send(message);
 
       // the message IDs are set on the server
@@ -2282,7 +2317,7 @@ public class QueueControlTest extends ManagementTestBase {
       queueControl.expireMessage(messageID);
       Assert.assertEquals(1, queueControl.getMessagesExpired());
 
-      message = session.createMessage(false);
+      message = session.createMessage(durable);
       producer.send(message);
 
       // the message IDs are set on the server
@@ -2305,13 +2340,13 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesExpired());
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       producer.send(message);
 
       // the message IDs are set on the server
@@ -2321,6 +2356,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       queueControl.sendMessageToDeadLetterAddress(messageID);
       Assert.assertEquals(1, queueControl.getMessagesKilled());
+      assertMessageMetrics(queueControl, 0, durable);
 
       message = session.createMessage(false);
       producer.send(message);
@@ -2354,7 +2390,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       SimpleString testQueueName = new SimpleString("newQueue");
       String testQueueName2 = "newQueue2";
-      this.server.createQueue(testQueueName, RoutingType.ANYCAST, testQueueName, null, false, false);
+      this.server.createQueue(testQueueName, RoutingType.MULTICAST, testQueueName, null, durable, false);
 
       Notification notif = listener.getNotification();
 
@@ -2369,7 +2405,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       ActiveMQServerControl control = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
 
-      control.createQueue(testQueueName2, testQueueName2);
+      control.createQueue(testQueueName2, testQueueName2, RoutingType.MULTICAST.toString());
 
       notif = listener.getNotification();
       System.out.println("got notif: " + notif);
@@ -2387,7 +2423,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -2438,4 +2474,46 @@ public class QueueControlTest extends ManagementTestBase {
       JsonObject object = (JsonObject) array.get(0);
       return object.getJsonNumber("messageID").longValue();
    }
+
+   protected void assertMessageMetrics(final QueueControl queueControl, long messageCount, boolean durable) throws Exception {
+      assertMetrics(queueControl, messageCount, durable, queueControl::getMessageCount,
+            queueControl::getPersistentSize, queueControl::getDurableMessageCount, queueControl::getDurablePersistentSize);
+   }
+
+   protected void assertScheduledMetrics(final QueueControl queueControl, long messageCount, boolean durable) throws Exception {
+      assertMetrics(queueControl, messageCount, durable, queueControl::getScheduledCount,
+            queueControl::getScheduledSize, queueControl::getDurableScheduledCount, queueControl::getDurableScheduledSize);
+   }
+
+   protected void assertDeliveringMetrics(final QueueControl queueControl, long messageCount, boolean durable) throws Exception {
+      assertMetrics(queueControl, messageCount, durable, queueControl::getDeliveringCount,
+            queueControl::getDeliveringSize, queueControl::getDurableDeliveringCount, queueControl::getDurableDeliveringSize);
+   }
+
+   protected void assertMetrics(final QueueControl queueControl, long messageCount, boolean durable,
+         Supplier<Number> count, Supplier<Number> size,
+         Supplier<Number>durableCount, Supplier<Number> durableSize) throws Exception {
+
+      //make sure count stat equals message count
+      Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == messageCount, 3, 100));
+
+      if (messageCount > 0) {
+         //verify size stat greater than 0
+         Assert.assertTrue(Wait.waitFor(() -> size.get().longValue() > 0, 3, 100));
+
+         //If durable then make sure durable count and size are correct
+         if (durable) {
+            Assert.assertTrue(Wait.waitFor(() -> durableCount.get().longValue() == messageCount, 3, 100));
+            Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() > 0, 3, 100));
+         } else {
+            Assert.assertTrue(Wait.waitFor(() -> durableCount.get().longValue() == 0, 3, 100));
+            Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() == 0, 3, 100));
+         }
+      } else {
+         Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == 0, 3, 100));
+         Assert.assertTrue(Wait.waitFor(() -> durableCount.get().longValue() == 0, 3, 100));
+         Assert.assertTrue(Wait.waitFor(() -> size.get().longValue() == 0, 3, 100));
+         Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() == 0, 3, 100));
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 09621af..aafbb5b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -16,16 +16,24 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
-import javax.management.openmbean.CompositeData;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.management.openmbean.CompositeData;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(value = Parameterized.class)
 public class QueueControlUsingCoreTest extends QueueControlTest {
 
+   public QueueControlUsingCoreTest(boolean durable) {
+      super(durable);
+   }
+
    @Override
    protected QueueControl createManagementControl(final SimpleString address,
                                                   final SimpleString queue) throws Exception {
@@ -117,6 +125,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public long getDeliveringSize() {
+            return (Long) proxy.retrieveAttributeValue("deliveringSize", Long.class);
+         }
+
+         @Override
+         public int getDurableDeliveringCount() {
+            return (Integer) proxy.retrieveAttributeValue("durableDeliveringCount", Integer.class);
+         }
+
+         @Override
+         public long getDurableDeliveringSize() {
+            return (Long) proxy.retrieveAttributeValue("durableDeliveringSize", Long.class);
+         }
+
+         @Override
          public String getExpiryAddress() {
             return (String) proxy.retrieveAttributeValue("expiryAddress");
          }
@@ -187,6 +210,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public long getScheduledSize() {
+            return (Long) proxy.retrieveAttributeValue("scheduledSize", Long.class);
+         }
+
+         @Override
+         public long getDurableScheduledCount() {
+            return (Long) proxy.retrieveAttributeValue("durableScheduledCount", Long.class);
+         }
+
+         @Override
+         public long getDurableScheduledSize() {
+            return (Long) proxy.retrieveAttributeValue("durableScheduledSize", Long.class);
+         }
+
+         @Override
          public boolean isDurable() {
             return (Boolean) proxy.retrieveAttributeValue("durable");
          }
@@ -455,6 +493,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          public String listDeliveringMessagesAsJSON() throws Exception {
             return (String) proxy.invokeOperation("listDeliveringMessagesAsJSON");
          }
+
+         @Override
+         public long getPersistentSize() {
+            return (Long) proxy.retrieveAttributeValue("persistentSize", Long.class);
+         }
+
+         @Override
+         public long getDurableMessageCount() {
+            return (Long) proxy.retrieveAttributeValue("durableMessageCount", Long.class);
+         }
+
+         @Override
+         public long getDurablePersistentSize() {
+            return (Long) proxy.retrieveAttributeValue("durablePersistentSize", Long.class);
+         }
       };
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index 6f47ba7..4714580 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -89,15 +89,17 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          Transaction tx = new TransactionImpl(server.getStorageManager());
 
-         counter.increment(tx, 1);
+         counter.increment(tx, 1, 1000);
 
          assertEquals(0, counter.getValue());
+         assertEquals(0, counter.getPersistentSize());
 
          tx.commit();
 
          storage.waitOnOperations();
 
          assertEquals(1, counter.getValue());
+         assertEquals(1000, counter.getPersistentSize());
       } finally {
          sf.close();
          session.close();
@@ -121,7 +123,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          for (int i = 0; i < 2100; i++) {
 
-            counter.increment(tx, 1);
+            counter.increment(tx, 1, 1000);
 
             if (i % 200 == 0) {
                tx.commit();
@@ -129,6 +131,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
                storage.waitOnOperations();
 
                assertEquals(i + 1, counter.getValue());
+               assertEquals((i + 1) * 1000, counter.getPersistentSize());
 
                tx = new TransactionImpl(server.getStorageManager());
             }
@@ -139,6 +142,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          storage.waitOnOperations();
 
          assertEquals(2100, counter.getValue());
+         assertEquals(2100 * 1000, counter.getPersistentSize());
 
          server.stop();
 
@@ -153,6 +157,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          counter = locateCounter(queue);
 
          assertEquals(2100, counter.getValue());
+         assertEquals(2100 * 1000, counter.getPersistentSize());
 
       } finally {
          sf.close();
@@ -180,7 +185,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          for (int i = 0; i < 2100; i++) {
 
-            counter.increment(tx, 1);
+            counter.increment(tx, 1, 1000);
 
             if (i % 200 == 0) {
                tx.commit();
@@ -188,6 +193,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
                storage.waitOnOperations();
 
                assertEquals(i + 1, counter.getValue());
+               assertEquals((i + 1) * 1000, counter.getPersistentSize());
 
                tx = new TransactionImpl(server.getStorageManager());
             }
@@ -198,6 +204,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          storage.waitOnOperations();
 
          assertEquals(2100, counter.getValue());
+         assertEquals(2100 * 1000, counter.getPersistentSize());
 
          server.stop();
 
@@ -212,6 +219,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          counter = locateCounter(queue);
 
          assertEquals(0, counter.getValue());
+         assertEquals(0, counter.getPersistentSize());
 
       } finally {
          sf.close();
@@ -230,15 +238,17 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       Transaction tx = new TransactionImpl(server.getStorageManager());
 
-      counter.increment(tx, 1);
+      counter.increment(tx, 1, 1000);
 
       assertEquals(0, counter.getValue());
+      assertEquals(0, counter.getPersistentSize());
 
       tx.commit();
 
       storage.waitOnOperations();
 
       assertEquals(1, counter.getValue());
+      assertEquals(1000, counter.getPersistentSize());
 
       sl.close();
 
@@ -255,6 +265,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
       counter = locateCounter(queue);
 
       assertEquals(1, counter.getValue());
+      assertEquals(1000, counter.getPersistentSize());
 
    }
 
@@ -283,7 +294,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
       Transaction tx = new TransactionImpl(xid, server.getStorageManager(), 300);
 
       for (int i = 0; i < 2000; i++) {
-         counter.increment(tx, 1);
+         counter.increment(tx, 1, 1000);
       }
 
       assertEquals(0, counter.getValue());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java
new file mode 100644
index 0000000..2247111
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence.metrics;
+
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public abstract class AbstractPersistentStatTestSupport extends JMSTestBase {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(AbstractPersistentStatTestSupport.class);
+
+   protected static int defaultMessageSize = 1000;
+
+   @Override
+   protected boolean usePersistence() {
+      return true;
+   }
+
+   protected void consumeTestQueueMessages(String queueName, int num) throws Exception {
+
+      // Start the connection
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId2" + queueName);
+      connection.start();
+      Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+      MessageConsumer consumer;
+      try {
+         consumer = session.createConsumer(queue);
+         for (int i = 0; i < num; i++) {
+            consumer.receive();
+         }
+         consumer.close();
+      } finally {
+         // consumer.close();
+         connection.close();
+      }
+
+   }
+
+   protected void browseTestQueueMessages(String queueName) throws Exception {
+      // Start the connection
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId2" + queueName);
+      connection.start();
+      Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+
+      try {
+         QueueBrowser queueBrowser = session.createBrowser(queue);
+         @SuppressWarnings("unchecked")
+         Enumeration<Message> messages = queueBrowser.getEnumeration();
+         while (messages.hasMoreElements()) {
+            messages.nextElement();
+         }
+
+      } finally {
+         connection.close();
+      }
+
+   }
+
+   protected void consumeDurableTestMessages(Connection connection, String sub, int size, String topicName,
+         AtomicLong publishedMessageSize) throws Exception {
+
+
+      Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(topicName);
+
+      try {
+         TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
+         for (int i = 0; i < size; i++) {
+            ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
+            if (publishedMessageSize != null) {
+               publishedMessageSize.addAndGet(-message.getCoreMessage().getEncodeSize());
+            }
+         }
+
+      } finally {
+         session.close();
+      }
+
+   }
+
+   protected void publishTestQueueMessages(int count, String queueName, int deliveryMode, int messageSize,
+         AtomicLong publishedMessageSize, boolean transacted) throws Exception {
+
+      // Start the connection
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId" + queueName);
+      connection.start();
+      Session session = transacted ? connection.createSession(transacted, QueueSession.SESSION_TRANSACTED) :
+         connection.createSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+
+      try {
+         MessageProducer prod = session.createProducer(queue);
+         prod.setDeliveryMode(deliveryMode);
+         for (int i = 0; i < count; i++) {
+            prod.send(createMessage(i, session, messageSize, publishedMessageSize));
+         }
+
+         if (transacted) {
+            session.commit();
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   protected void publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+         int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, boolean verifyBrowsing,
+         boolean shared)
+         throws Exception {
+      this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
+            publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT, shared);
+   }
+
+   protected void publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+         int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, boolean verifyBrowsing,
+         int deliveryMode, boolean shared) throws Exception {
+
+      Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(topicName);
+      for (String subName : subNames) {
+         if (shared) {
+            session.createSharedDurableConsumer(topic, subName);
+         } else {
+            session.createDurableSubscriber(topic, subName);
+         }
+      }
+
+      try {
+         // publish a bunch of non-persistent messages to fill up the temp
+         // store
+         MessageProducer prod = session.createProducer(topic);
+         prod.setDeliveryMode(deliveryMode);
+         for (int i = 0; i < publishSize; i++) {
+            prod.send(createMessage(i, session, messageSize, publishedMessageSize));
+         }
+
+      } finally {
+         session.close();
+      }
+
+   }
+
+   /**
+    * Generate random messages between 100 bytes and maxMessageSize
+    *
+    * @param session
+    * @return
+    * @throws JMSException
+    * @throws ActiveMQException
+    */
+   protected BytesMessage createMessage(int count, Session session, int maxMessageSize, AtomicLong publishedMessageSize)
+         throws JMSException, ActiveMQException {
+      final ActiveMQBytesMessage message = (ActiveMQBytesMessage) session.createBytesMessage();
+
+      final Random randomSize = new Random();
+      int size = randomSize.nextInt((maxMessageSize - 100) + 1) + 100;
+      final byte[] data = new byte[size];
+      final Random rng = new Random();
+      rng.nextBytes(data);
+      message.writeBytes(data);
+      if (publishedMessageSize != null) {
+         publishedMessageSize.addAndGet(message.getCoreMessage().getPersistentSize());
+      }
+
+      return message;
+   }
+}

Reply | Threaded
Open this post in threaded view
|

[3/6] activemq-artemis git commit: ARTEMIS-1663 - Add new message count and size metrics

clebertsuconic-2
In reply to this post by clebertsuconic-2
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 2c4db3e..1c4038b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -722,7 +722,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             refs.remove(message.getMessageID());
 
             // The delivering count should also be decreased as to avoid inconsistencies
-            ((QueueImpl) ref.getQueue()).decDelivering();
+            ((QueueImpl) ref.getQueue()).decDelivering(ref);
          }
 
          connectionFailed(e, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 90b8814..2620cf9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -116,7 +117,7 @@ public class LastValueQueue extends QueueImpl {
             } else {
                // We keep the current ref and ack the one we are returning
 
-               super.referenceHandled();
+               super.referenceHandled(ref);
 
                try {
                   super.acknowledge(ref);
@@ -139,7 +140,7 @@ public class LastValueQueue extends QueueImpl {
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
-      referenceHandled();
+      referenceHandled(ref);
 
       try {
          oldRef.acknowledge();
@@ -323,6 +324,11 @@ public class LastValueQueue extends QueueImpl {
       public Long getConsumerId() {
          return this.consumerId;
       }
+
+      @Override
+      public long getPersistentSize() throws ActiveMQException {
+         return ref.getPersistentSize();
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 7543ba5..2802740 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -158,7 +159,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    @Override
    public void handled() {
-      queue.referenceHandled();
+      queue.referenceHandled(this);
    }
 
    @Override
@@ -239,4 +240,9 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    public int hashCode() {
       return this.getMessage().hashCode();
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return this.getMessage().getPersistentSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index cfd06d9..89ea2fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -364,16 +365,25 @@ public class PostOfficeJournalLoader implements JournalLoader {
 
                List<PagedMessage> pgMessages = pg.read(storageManager);
                Map<Long, AtomicInteger> countsPerQueueOnPage = new HashMap<>();
+               Map<Long, AtomicLong> sizePerQueueOnPage = new HashMap<>();
 
                for (PagedMessage pgd : pgMessages) {
                   if (pgd.getTransactionID() <= 0) {
                      for (long q : pgd.getQueueIDs()) {
                         AtomicInteger countQ = countsPerQueueOnPage.get(q);
+                        AtomicLong sizeQ = sizePerQueueOnPage.get(q);
                         if (countQ == null) {
                            countQ = new AtomicInteger(0);
                            countsPerQueueOnPage.put(q, countQ);
                         }
+                        if (sizeQ == null) {
+                           sizeQ = new AtomicLong(0);
+                           sizePerQueueOnPage.put(q, sizeQ);
+                        }
                         countQ.incrementAndGet();
+                        if (pgd.getPersistentSize() > 0) {
+                           sizeQ.addAndGet(pgd.getPersistentSize());
+                        }
                      }
                   }
                }
@@ -387,12 +397,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
                   PageSubscriptionCounter counter = store.getCursorProvider().getSubscription(entry.getKey()).getCounter();
 
                   AtomicInteger value = countsPerQueueOnPage.get(entry.getKey());
+                  AtomicLong sizeValue = sizePerQueueOnPage.get(entry.getKey());
 
                   if (value == null) {
                      logger.debug("Page " + entry.getKey() + " wasn't open, so we will just ignore");
                   } else {
                      logger.debug("Replacing counter " + value.get());
-                     counter.increment(txRecoverCounter, value.get());
+                     counter.increment(txRecoverCounter, value.get(), sizeValue.get());
                   }
                }
             } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index dbf79e2..5530179 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -171,6 +171,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
    private final AtomicInteger queueMemorySize = new AtomicInteger(0);
 
+   private final QueuePendingMessageMetrics pendingMetrics = new QueuePendingMessageMetrics(this);
+
+   private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
+
    // used to control if we should recalculate certain positions inside deliverAsync
    private volatile boolean consumersChanged = true;
 
@@ -186,8 +190,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private AtomicLong messagesKilled = new AtomicLong(0);
 
-   protected final AtomicInteger deliveringCount = new AtomicInteger(0);
-
    private boolean paused;
 
    private long pauseStatusRecord = -1;
@@ -452,7 +454,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       this.server = server;
 
-      scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
+      scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
 
       if (addressSettingsRepository != null) {
          addressSettingsRepositoryListener = new AddressSettingsRepositoryListener();
@@ -1118,10 +1120,45 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (pageSubscription != null) {
          // messageReferences will have depaged messages which we need to discount from the counter as they are
          // counted on the pageSubscription as well
-         return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
+         return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+      } else {
+         return pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount();
+      }
+   }
+
+   @Override
+   public long getPersistentSize() {
+      if (pageSubscription != null) {
+         // messageReferences will have depaged messages which we need to discount from the counter as they are
+         // counted on the pageSubscription as well
+         return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize() + pageSubscription.getPersistentSize();
       } else {
-         return messageReferences.size() + getScheduledCount() + deliveringCount.get();
+         return pendingMetrics.getPersistentSize() + getScheduledSize() + getDeliveringSize();
+      }
+   }
+
+   @Override
+   public long getDurableMessageCount() {
+      if (isDurable()) {
+         if (pageSubscription != null) {
+            return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount() + pageSubscription.getMessageCount();
+         } else {
+            return pendingMetrics.getDurableMessageCount() + getDurableScheduledCount() + getDurableDeliveringCount();
+         }
+      }
+      return 0;
+   }
+
+   @Override
+   public long getDurablePersistentSize() {
+      if (isDurable()) {
+         if (pageSubscription != null) {
+            return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize() + pageSubscription.getPersistentSize();
+         } else {
+            return pendingMetrics.getDurablePersistentSize() + getDurableScheduledSize() + getDurableDeliveringSize();
+         }
       }
+      return 0;
    }
 
    @Override
@@ -1130,6 +1167,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public synchronized long getScheduledSize() {
+      return scheduledDeliveryHandler.getScheduledSize();
+   }
+
+   @Override
+   public synchronized int getDurableScheduledCount() {
+      return scheduledDeliveryHandler.getDurableScheduledCount();
+   }
+
+   @Override
+   public synchronized long getDurableScheduledSize() {
+      return scheduledDeliveryHandler.getDurableScheduledSize();
+   }
+
+   @Override
    public synchronized List<MessageReference> getScheduledMessages() {
       return scheduledDeliveryHandler.getScheduledReferences();
    }
@@ -1153,7 +1205,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public int getDeliveringCount() {
-      return deliveringCount.get();
+      return deliveringMetrics.getMessageCount();
+   }
+
+   @Override
+   public long getDeliveringSize() {
+      return deliveringMetrics.getPersistentSize();
+   }
+
+   @Override
+   public int getDurableDeliveringCount() {
+      return deliveringMetrics.getDurableMessageCount();
+   }
+
+   @Override
+   public long getDurableDeliveringSize() {
+      return deliveringMetrics.getDurablePersistentSize();
    }
 
    @Override
@@ -1239,7 +1306,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       getRefsOperation(tx).addAck(ref);
 
       // https://issues.jboss.org/browse/HORNETQ-609
-      incDelivering();
+      incDelivering(ref);
 
       messagesAcknowledged.incrementAndGet();
    }
@@ -1287,7 +1354,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
          resetAllIterators();
       } else {
-         decDelivering();
+         decDelivering(reference);
       }
    }
 
@@ -1354,8 +1421,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void referenceHandled() {
-      incDelivering();
+   public void referenceHandled(MessageReference ref) {
+      incDelivering(ref);
    }
 
    @Override
@@ -1419,7 +1486,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return iterQueue(flushLimit, filter1, new QueueIterateAction() {
          @Override
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
-            incDelivering();
+            incDelivering(ref);
             acknowledge(tx, ref, ackReason);
             refRemoved(ref);
          }
@@ -1539,7 +1606,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (ref.getMessage().getMessageID() == messageID) {
-               incDelivering();
+               incDelivering(ref);
                acknowledge(tx, ref);
                iter.remove();
                refRemoved(ref);
@@ -1618,7 +1685,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (ref.getMessage().getMessageID() == messageID) {
-               incDelivering();
+               incDelivering(ref);
                expire(ref);
                iter.remove();
                refRemoved(ref);
@@ -1644,7 +1711,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (filter == null || filter.match(ref.getMessage())) {
-               incDelivering();
+               incDelivering(ref);
                expire(tx, ref);
                iter.remove();
                refRemoved(ref);
@@ -1711,7 +1778,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                         if (tx == null) {
                            tx = new TransactionImpl(storageManager);
                         }
-                        incDelivering();
+                        incDelivering(ref);
                         expired = true;
                         expire(tx, ref);
                         iter.remove();
@@ -1763,7 +1830,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (ref.getMessage().getMessageID() == messageID) {
-               incDelivering();
+               incDelivering(ref);
                sendToDeadLetterAddress(null, ref);
                iter.remove();
                refRemoved(ref);
@@ -1782,7 +1849,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          while (iter.hasNext()) {
             MessageReference ref = iter.next();
             if (filter == null || filter.match(ref.getMessage())) {
-               incDelivering();
+               incDelivering(ref);
                sendToDeadLetterAddress(null, ref);
                iter.remove();
                refRemoved(ref);
@@ -1804,11 +1871,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             if (ref.getMessage().getMessageID() == messageID) {
                iter.remove();
                refRemoved(ref);
-               incDelivering();
+               incDelivering(ref);
                try {
                   move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL);
                } catch (Exception e) {
-                  decDelivering();
+                  decDelivering(ref);
                   throw e;
                }
                return true;
@@ -1836,7 +1903,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
             boolean ignored = false;
 
-            incDelivering();
+            incDelivering(ref);
 
             if (rejectDuplicates) {
                byte[] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
@@ -1881,7 +1948,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             if (originalMessageAddress != null) {
 
-               incDelivering();
+               incDelivering(ref);
 
                Long targetQueue = null;
                if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
@@ -2065,6 +2132,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private synchronized void internalAddTail(final MessageReference ref) {
       refAdded(ref);
       messageReferences.addTail(ref, getPriority(ref));
+      pendingMetrics.incrementMetrics(ref);
    }
 
    /**
@@ -2076,6 +2144,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
     */
    private void internalAddHead(final MessageReference ref) {
       queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+      pendingMetrics.incrementMetrics(ref);
       refAdded(ref);
 
       int priority = getPriority(ref);
@@ -2330,6 +2399,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    protected void refRemoved(MessageReference ref) {
       queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
+      pendingMetrics.decrementMetrics(ref);
       if (ref.isPaged()) {
          pagedReferences.decrementAndGet();
       }
@@ -2379,6 +2449,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          }
          addTail(reference, false);
          pageIterator.remove();
+
+         //We have to increment this here instead of in the iterator so we have access to the reference from next()
+         pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
       }
 
       if (logger.isDebugEnabled()) {
@@ -2387,7 +2460,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          }
 
          if (logger.isDebugEnabled()) {
-            logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringCount.get());
+            logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount());
 
          }
       }
@@ -2466,7 +2539,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
          }
 
-         decDelivering();
+         decDelivering(reference);
 
          return true;
       }
@@ -2890,7 +2963,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void postAcknowledge(final MessageReference ref) {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
-      queue.decDelivering();
+      queue.decDelivering(ref);
 
       if (ref.isPaged()) {
          // nothing to be done
@@ -2958,7 +3031,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       try {
          Transaction transaction = new TransactionImpl(storageManager);
          for (MessageReference reference : refs) {
-            incDelivering(); // post ack will decrement this, so need to inc
+            incDelivering(reference); // post ack will decrement this, so need to inc
             acknowledge(transaction, reference, AckReason.KILLED);
          }
          transaction.commit();
@@ -3264,17 +3337,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private int incDelivering() {
-      return deliveringCount.incrementAndGet();
+   private void incDelivering(MessageReference ref) {
+      deliveringMetrics.incrementMetrics(ref);
    }
 
-   public void decDelivering() {
-      deliveringCount.decrementAndGet();
+   public void decDelivering(final MessageReference reference) {
+      deliveringMetrics.decrementMetrics(reference);
    }
 
-   @Override
-   public void decDelivering(int size) {
-      deliveringCount.addAndGet(-size);
+   private long getPersistentSize(final MessageReference reference) {
+      long size = 0;
+
+      try {
+         size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0;
+      } catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
+      }
+
+      return size;
    }
 
    private void configureExpiry(final AddressSettings settings) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
new file mode 100644
index 0000000..f6d65d4
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+
+public class QueuePendingMessageMetrics {
+
+   private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> COUNT_UPDATER =
+         AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "messageCount");
+
+   private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> DURABLE_COUNT_UPDATER =
+         AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durableMessageCount");
+
+   private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> SIZE_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "persistentSize");
+
+   private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> DURABLE_SIZE_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durablePersistentSize");
+
+   private volatile int messageCount;
+
+   private volatile long persistentSize;
+
+   private volatile int durableMessageCount;
+
+   private volatile long durablePersistentSize;
+
+   private final Queue queue;
+
+   public QueuePendingMessageMetrics(final Queue queue) {
+      Preconditions.checkNotNull(queue);
+      this.queue = queue;
+   }
+
+   public void incrementMetrics(final MessageReference reference) {
+      long size = getPersistentSize(reference);
+      COUNT_UPDATER.incrementAndGet(this);
+      SIZE_UPDATER.addAndGet(this, size);
+      if (queue.isDurable() && reference.getMessage().isDurable()) {
+         DURABLE_COUNT_UPDATER.incrementAndGet(this);
+         DURABLE_SIZE_UPDATER.addAndGet(this, size);
+      }
+   }
+
+   public void decrementMetrics(final MessageReference reference) {
+      long size = -getPersistentSize(reference);
+      COUNT_UPDATER.decrementAndGet(this);
+      SIZE_UPDATER.addAndGet(this, size);
+      if (queue.isDurable() && reference.getMessage().isDurable()) {
+         DURABLE_COUNT_UPDATER.decrementAndGet(this);
+         DURABLE_SIZE_UPDATER.addAndGet(this, size);
+      }
+   }
+
+
+
+   /**
+    * @return the messageCount
+    */
+   public int getMessageCount() {
+      return messageCount;
+   }
+
+   /**
+    * @param messageCount the messageCount to set
+    */
+   public void setMessageCount(int messageCount) {
+      this.messageCount = messageCount;
+   }
+
+   /**
+    * @return the persistentSize
+    */
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
+   /**
+    * @param persistentSize the persistentSize to set
+    */
+   public void setPersistentSize(long persistentSize) {
+      this.persistentSize = persistentSize;
+   }
+
+   /**
+    * @return the durableMessageCount
+    */
+   public int getDurableMessageCount() {
+      return durableMessageCount;
+   }
+
+   /**
+    * @param durableMessageCount the durableMessageCount to set
+    */
+   public void setDurableMessageCount(int durableMessageCount) {
+      this.durableMessageCount = durableMessageCount;
+   }
+
+   /**
+    * @return the durablePersistentSize
+    */
+   public long getDurablePersistentSize() {
+      return durablePersistentSize;
+   }
+
+   /**
+    * @param durablePersistentSize the durablePersistentSize to set
+    */
+   public void setDurablePersistentSize(long durablePersistentSize) {
+      this.durablePersistentSize = durablePersistentSize;
+   }
+
+   private long getPersistentSize(final MessageReference reference) {
+      long size = 0;
+
+      try {
+         size = reference.getPersistentSize() > 0 ? reference.getPersistentSize() : 0;
+      } catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
+      }
+
+      return size;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index 6eaba4c..78ec785 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -50,8 +50,12 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
    // just adding some information to keep it in order accordingly to the initial operations
    private final TreeSet<RefScheduled> scheduledReferences = new TreeSet<>(new MessageReferenceComparator());
 
-   public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor) {
+   private final QueuePendingMessageMetrics metrics;
+
+   public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor,
+         final Queue queue) {
       this.scheduledExecutor = scheduledExecutor;
+      this.metrics = new QueuePendingMessageMetrics(queue);
    }
 
    @Override
@@ -76,13 +80,27 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
       synchronized (scheduledReferences) {
          scheduledReferences.add(new RefScheduled(ref, tail));
       }
+      metrics.incrementMetrics(ref);
    }
 
    @Override
    public int getScheduledCount() {
-      synchronized (scheduledReferences) {
-         return scheduledReferences.size();
-      }
+      return metrics.getMessageCount();
+   }
+
+   @Override
+   public int getDurableScheduledCount() {
+      return metrics.getDurableMessageCount();
+   }
+
+   @Override
+   public long getScheduledSize() {
+      return metrics.getPersistentSize();
+   }
+
+   @Override
+   public long getDurableScheduledSize() {
+      return metrics.getDurablePersistentSize();
    }
 
    @Override
@@ -109,6 +127,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
             if (filter == null || filter.match(ref.getMessage())) {
                iter.remove();
                refs.add(ref);
+               metrics.decrementMetrics(ref);
             }
          }
       }
@@ -123,6 +142,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
             MessageReference ref = iter.next().getRef();
             if (ref.getMessage().getMessageID() == id) {
                iter.remove();
+               metrics.decrementMetrics(ref);
                return ref;
             }
          }
@@ -205,6 +225,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
                }
 
                iter.remove();
+               metrics.decrementMetrics(reference);
 
                reference.setScheduledDeliveryTime(0);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 4fda8b3..2cae2c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
@@ -63,7 +64,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleRandom() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       long nextMessage = 0;
       long NUMBER_OF_SEQUENCES = 100000;
@@ -88,7 +89,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleSameTimeHeadAndTail() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       long time = System.currentTimeMillis() + 10000;
       for (int i = 10001; i < 20000; i++) {
@@ -110,7 +111,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleFixedSample() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       addMessage(handler, 0, 48L, true);
       addMessage(handler, 1, 75L, true);
@@ -124,7 +125,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleWithAddHeads() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       addMessage(handler, 0, 1, true);
       addMessage(handler, 1, 2, true);
@@ -145,7 +146,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
    @Test
    public void testScheduleFixedSampleTailAndHead() throws Exception {
-      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null);
+      ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(null, new FakeQueueForScheduleUnitTest(0));
 
       // mix a sequence of tails / heads, but at the end this was supposed to be all sequential
       addMessage(handler, 1, 48L, true);
@@ -191,8 +192,9 @@ public class ScheduledDeliveryHandlerTest extends Assert {
    private void internalSchedule(ExecutorService executor, ScheduledThreadPoolExecutor scheduler) throws Exception {
       final int NUMBER_OF_MESSAGES = 200;
       int NUMBER_OF_THREADS = 20;
-      final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler);
+
       final FakeQueueForScheduleUnitTest fakeQueue = new FakeQueueForScheduleUnitTest(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
+      final ScheduledDeliveryHandlerImpl handler = new ScheduledDeliveryHandlerImpl(scheduler, fakeQueue);
 
       final long now = System.currentTimeMillis();
 
@@ -776,6 +778,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public void sendBuffer(ByteBuf buffer, int count) {
 
       }
+
+      @Override
+      public long getPersistentSize() throws ActiveMQException {
+         return 0;
+      }
    }
 
    public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
@@ -1017,12 +1024,52 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public long getPersistentSize() {
+         return 0;
+      }
+
+      @Override
+      public long getDurableMessageCount() {
+         return 0;
+      }
+
+      @Override
+      public long getDurablePersistentSize() {
+         return 0;
+      }
+
+      @Override
       public int getDeliveringCount() {
          return 0;
       }
 
       @Override
-      public void referenceHandled() {
+      public long getDeliveringSize() {
+         return 0;
+      }
+
+      @Override
+      public int getDurableDeliveringCount() {
+         return 0;
+      }
+
+      @Override
+      public long getDurableDeliveringSize() {
+         return 0;
+      }
+
+      @Override
+      public int getDurableScheduledCount() {
+         return 0;
+      }
+
+      @Override
+      public long getDurableScheduledSize() {
+         return 0;
+      }
+
+      @Override
+      public void referenceHandled(MessageReference ref) {
 
       }
 
@@ -1032,6 +1079,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public long getScheduledSize() {
+         return 0;
+      }
+
+      @Override
       public List<MessageReference> getScheduledMessages() {
          return null;
       }
@@ -1310,7 +1362,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public SimpleString getUser() {
          return null;
       }
-
       @Override
       public boolean isLastValue() {
          return false;
@@ -1326,13 +1377,5 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
       }
 
-      @Override
-      public void decDelivering(int size) {
-
-      }
-
-
-
-
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index b256eb9..3a9a785 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -587,7 +587,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounter(long txID, long queueID, long value) throws Exception {
+      public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
          return 0;
       }
 
@@ -612,12 +612,12 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
+      public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception {
          return 0;
       }
 
       @Override
-      public long storePageCounterInc(long queueID, int add) throws Exception {
+      public long storePageCounterInc(long queueID, int add, long size) throws Exception {
          return 0;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
new file mode 100644
index 0000000..4ef4425
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
@@ -0,0 +1,37 @@
+package metrics
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+
+//validate metrics are recovered
+Object[] queueControls = server.getJMSServerManager().getActiveMQServer().getManagementService().getResources(QueueControl.class);
+for (Object o : queueControls) {
+    QueueControl c = (QueueControl) o;
+    GroovyRun.assertTrue(c.getPersistentSize() > 0);
+    GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
+    GroovyRun.assertEquals(16l, c.getMessageCount());
+    GroovyRun.assertEquals(16l, c.getDurableMessageCount());
+ }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
index fe58505..78d1241 100644
--- a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
+++ b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
@@ -31,8 +31,10 @@ String id = arg[1];
 String type = arg[2];
 String producer = arg[3];
 String consumer = arg[4];
+String globalMaxSize = arg[5];
 
 println("type = " + type);
+println("globalMaxSize = " + globalMaxSize);
 
 configuration = new ConfigurationImpl();
 configuration.setJournalType(JournalType.NIO);
@@ -44,6 +46,10 @@ configuration.setPersistenceEnabled(persistent);
 try {
     if (!type.startsWith("ARTEMIS-1")) {
         configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true));
+        if (globalMaxSize != null) {
+            configuration.getAddressesSettings().get("#").setPageSizeBytes(globalMaxSize);
+            configuration.setGlobalMaxSize(Long.parseLong(globalMaxSize));
+        }
     }
 } catch (Throwable e) {
     // need to ignore this for 1.4

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
index 40da24c..958db27 100644
--- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.activemq.artemis.tests.compatibility;
 
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -28,9 +31,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
-import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
-
 /**
  * To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
  *
@@ -105,5 +105,42 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
       evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
    }
 
+   /**
+    * Test that the server starts properly using an old journal even though persistent size
+    * metrics were not originaly stored
+    */
+   @Test
+   public void testSendReceiveQueueMetrics() throws Throwable {
+      setVariable(senderClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), senderClassloader, "journalTest");
+      evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
+      stopServer(senderClassloader);
+
+      setVariable(receiverClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), receiverClassloader, "journalTest");
+
+      setVariable(receiverClassloader, "latch", null);
+      evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
+   }
+
+   /**
+    * Test that the metrics are recovered when paging.  Even though the paging counts won't
+    * be persisted the journal the server should still start properly.  The persistent sizes
+    * will be recovered when the messages are depaged
+    */
+   @Test
+   public void testSendReceiveSizeQueueMetricsPaging() throws Throwable {
+      setVariable(senderClassloader, "persistent", true);
+      //Set max size to 1 to cause messages to immediately go to the paging store
+      startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1));
+      evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
+      stopServer(senderClassloader);
+
+      setVariable(receiverClassloader, "persistent", true);
+      startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1));
+
+
+      evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");
+   }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
index e2b9648..9001180 100644
--- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/VersionedBaseTest.java
@@ -189,6 +189,10 @@ public abstract class VersionedBaseTest {
    }
 
    public void startServer(File folder, ClassLoader loader, String serverName) throws Throwable {
+      startServer(folder, loader, serverName, null);
+   }
+
+   public void startServer(File folder, ClassLoader loader, String serverName, String globalMaxSize) throws Throwable {
       folder.mkdirs();
 
       System.out.println("Folder::" + folder);
@@ -202,9 +206,8 @@ public abstract class VersionedBaseTest {
          scriptToUse = "servers/hornetqServer.groovy";
       }
 
-      evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver);
+      evaluate(loader, scriptToUse, folder.getAbsolutePath(), serverName, server, sender, receiver, globalMaxSize);
    }
-
    public void stopServer(ClassLoader loader) throws Throwable {
       execute(loader, "server.stop()");
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 078c397..511d476 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -817,5 +817,10 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       public Map<String, Object> toPropertyMap() {
          return null;
       }
+
+      @Override
+      public long getPersistentSize() throws ActiveMQException {
+         return 0;
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index d37d134..fe9ba17 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -668,8 +668,8 @@ public class SendAckFailTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounter(long txID, long queueID, long value) throws Exception {
-         return manager.storePageCounter(txID, queueID, value);
+      public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
+         return manager.storePageCounter(txID, queueID, value, size);
       }
 
       @Override
@@ -693,13 +693,13 @@ public class SendAckFailTest extends ActiveMQTestBase {
       }
 
       @Override
-      public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
-         return manager.storePageCounterInc(txID, queueID, add);
+      public long storePageCounterInc(long txID, long queueID, int add, long size) throws Exception {
+         return manager.storePageCounterInc(txID, queueID, add, size);
       }
 
       @Override
-      public long storePageCounterInc(long queueID, int add) throws Exception {
-         return manager.storePageCounterInc(queueID, add);
+      public long storePageCounterInc(long queueID, int add, long size) throws Exception {
+         return manager.storePageCounterInc(queueID, add, size);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
index 8836ba8..9967e76 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
@@ -122,6 +122,21 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
       return control.getMessageCount();
    }
 
+   protected long getDurableMessageCount(QueueControl control) throws Exception {
+      control.flushExecutor();
+      return control.getDurableMessageCount();
+   }
+
+   protected long getMessageSize(QueueControl control) throws Exception {
+      control.flushExecutor();
+      return control.getPersistentSize();
+   }
+
+   protected long getDurableMessageSize(QueueControl control) throws Exception {
+      control.flushExecutor();
+      return control.getDurablePersistentSize();
+   }
+
    protected long getMessagesAdded(QueueControl control) throws Exception {
       control.flushExecutor();
       return control.getMessagesAdded();

Reply | Threaded
Open this post in threaded view
|

[4/6] activemq-artemis git commit: ARTEMIS-1663 - Add new message count and size metrics

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1663 - Add new message count and size metrics

Adding new metrics for tracking message counts and sizes on a Queue.
This includes tracking metrics for pending, delivering and scheduled
messages.  The paging store also tracks message size now.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ea70af15
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ea70af15
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ea70af15

Branch: refs/heads/master
Commit: ea70af15a3fbfc1b6ac86589e1e2e04a79ca3e23
Parents: 2eac195
Author: Christopher L. Shannon (cshannon) <[hidden email]>
Authored: Mon Feb 5 13:24:31 2018 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Feb 8 11:35:12 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  11 +-
 .../api/core/management/QueueControl.java       |  58 ++
 .../artemis/core/message/impl/CoreMessage.java  |   5 +
 .../core/message/impl/MessageInternalImpl.java  |   5 +
 .../artemis/core/journal/impl/JournalImpl.java  |   6 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  13 +-
 .../core/protocol/openwire/OpenwireMessage.java |   9 +-
 .../core/management/impl/QueueControlImpl.java  | 110 +++-
 .../artemis/core/paging/PagedMessage.java       |  10 +
 .../core/paging/cursor/PagePosition.java        |   4 +
 .../core/paging/cursor/PageSubscription.java    |   9 +
 .../paging/cursor/PageSubscriptionCounter.java  |  16 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |  19 +-
 .../paging/cursor/impl/PagePositionImpl.java    |  22 +
 .../impl/PageSubscriptionCounterImpl.java       | 146 ++++-
 .../cursor/impl/PageSubscriptionImpl.java       |  69 +-
 .../core/paging/impl/PagedMessageImpl.java      |   6 +
 .../core/paging/impl/PagingStoreImpl.java       |  11 +-
 .../core/persistence/StorageManager.java        |   6 +-
 .../journal/AbstractJournalStorageManager.java  |  37 +-
 .../impl/journal/DescribeJournal.java           |  61 +-
 .../impl/journal/LargeServerMessageImpl.java    |  10 +-
 .../journal/codec/PageCountPendingImpl.java     |   5 +-
 .../impl/journal/codec/PageCountRecord.java     |  18 +-
 .../impl/journal/codec/PageCountRecordInc.java  |  18 +-
 .../impl/nullpm/NullStorageManager.java         |   6 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   2 +-
 .../core/server/ActiveMQServerLogger.java       |   4 +
 .../artemis/core/server/MessageReference.java   |  11 +
 .../activemq/artemis/core/server/Queue.java     |  34 +-
 .../core/server/ScheduledDeliveryHandler.java   |   6 +
 .../core/server/cluster/impl/BridgeImpl.java    |   2 +-
 .../core/server/impl/LastValueQueue.java        |  10 +-
 .../core/server/impl/MessageReferenceImpl.java  |   8 +-
 .../server/impl/PostOfficeJournalLoader.java    |  13 +-
 .../artemis/core/server/impl/QueueImpl.java     | 144 +++-
 .../server/impl/QueuePendingMessageMetrics.java | 147 +++++
 .../impl/ScheduledDeliveryHandlerImpl.java      |  29 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  75 ++-
 .../transaction/impl/TransactionImplTest.java   |   6 +-
 .../main/resources/metrics/queueMetrics.groovy  |  37 ++
 .../main/resources/servers/artemisServer.groovy |   6 +
 .../compatibility/JournalCompatibilityTest.java |  43 +-
 .../tests/compatibility/VersionedBaseTest.java  |   7 +-
 .../integration/client/AcknowledgeTest.java     |   5 +
 .../integration/client/SendAckFailTest.java     |  12 +-
 .../management/ManagementTestBase.java          |  15 +
 .../management/QueueControlTest.java            | 452 +++++++------
 .../management/QueueControlUsingCoreTest.java   |  55 +-
 .../integration/paging/PagingCounterTest.java   |  21 +-
 .../AbstractPersistentStatTestSupport.java      | 213 ++++++
 .../metrics/JournalPageCountSizeTest.java       | 144 ++++
 .../metrics/JournalPendingMessageTest.java      | 651 +++++++++++++++++++
 .../unit/core/postoffice/impl/FakeQueue.java    |  47 +-
 .../core/server/impl/fakes/FakeConsumer.java    |   4 +-
 55 files changed, 2502 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index d24cd95..031c426 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -657,6 +657,15 @@ public interface Message {
 
    int getMemoryEstimate();
 
-
+   /**
+    * This is the size of the message when persisted on disk which is used for metrics tracking
+    * Note that even if the message itself is not persisted on disk (ie non-durable) this value is
+    * still used for metrics tracking
+    * If a normal message it will be the encoded message size
+    * If a large message it will be encoded message size + large message body size
+    * @return
+    * @throws ActiveMQException
+    */
+   long getPersistentSize() throws ActiveMQException;
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 447417f..2578684 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -81,12 +81,52 @@ public interface QueueControl {
    long getMessageCount();
 
    /**
+    * Returns the persistent size of all messages currently in this queue. The persistent size of a message
+    * is the amount of space the message would take up on disk which is used to track how much data there
+    * is to consume on this queue
+    */
+   @Attribute(desc = "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)")
+   long getPersistentSize();
+
+   /**
+    * Returns the number of durable messages currently in this queue.
+    */
+   @Attribute(desc = "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
+   long getDurableMessageCount();
+
+   /**
+    * Returns the persistent size of durable messages currently in this queue. The persistent size of a message
+    * is the amount of space the message would take up on disk which is used to track how much data there
+    * is to consume on this queue
+    */
+   @Attribute(desc = "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
+   long getDurablePersistentSize();
+
+   /**
     * Returns the number of scheduled messages in this queue.
     */
    @Attribute(desc = "number of scheduled messages in this queue")
    long getScheduledCount();
 
    /**
+    * Returns the size of scheduled messages in this queue.
+    */
+   @Attribute(desc = "persistent size of scheduled messages in this queue")
+   long getScheduledSize();
+
+   /**
+    * Returns the number of durable scheduled messages in this queue.
+    */
+   @Attribute(desc = "number of durable scheduled messages in this queue")
+   long getDurableScheduledCount();
+
+   /**
+    * Returns the size of durable scheduled messages in this queue.
+    */
+   @Attribute(desc = "persistent size of durable scheduled messages in this queue")
+   long getDurableScheduledSize();
+
+   /**
     * Returns the number of consumers consuming messages from this queue.
     */
    @Attribute(desc = "number of consumers consuming messages from this queue")
@@ -99,6 +139,24 @@ public interface QueueControl {
    int getDeliveringCount();
 
    /**
+    * Returns the persistent size of messages that this queue is currently delivering to its consumers.
+    */
+   @Attribute(desc = "persistent size of messages that this queue is currently delivering to its consumers")
+   long getDeliveringSize();
+
+   /**
+    * Returns the number of durable messages that this queue is currently delivering to its consumers.
+    */
+   @Attribute(desc = "number of durable messages that this queue is currently delivering to its consumers")
+   int getDurableDeliveringCount();
+
+   /**
+    * Returns the size of durable messages that this queue is currently delivering to its consumers.
+    */
+   @Attribute(desc = "persistent size of durable messages that this queue is currently delivering to its consumers")
+   long getDurableDeliveringSize();
+
+   /**
     * Returns the number of messages added to this queue since it was created.
     */
    @Attribute(desc = "number of messages added to this queue since it was created")

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 0fb7c3e..172cc18 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -1150,4 +1150,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
       return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return getEncodeSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
index 56ff816..17cb828 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
@@ -695,4 +695,9 @@ public class MessageInternalImpl implements MessageInternal {
       return new TypedProperties(message.getTypedProperties());
    }
 
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return message.getPersistentSize();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 88204d4..34ee72e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -191,7 +191,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
    private Executor appendExecutor = null;
 
-   private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
+   private final ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
 
    private final ExecutorFactory providedIOThreadPool;
    protected ExecutorFactory ioExecutorFactory;
@@ -2413,7 +2413,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                                  final List<JournalFile> newFiles,
                                                  final List<Pair<String, String>> renames) throws Exception {
 
-      return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames);
+      return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, renames);
    }
 
 
@@ -2763,7 +2763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       ArrayList<String> newFiles = new ArrayList<>();
       ArrayList<Pair<String, String>> renames = new ArrayList<>();
 
-      SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles, renames);
+      SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames);
       if (controlFile != null) {
          for (String dataFile : dataFiles) {
             SequentialFile file = fileFactory.createSequentialFile(dataFile);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index cdab412..2d72cf9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -24,10 +24,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
@@ -60,6 +58,10 @@ import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
 public class AMQPMessage extends RefCountMessage {
 
@@ -1179,4 +1181,9 @@ public class AMQPMessage extends RefCountMessage {
    private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
       return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return getEncodeSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index c63fe19..45e8953 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.protocol.openwire;
 
 import java.util.Set;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
@@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
+import io.netty.buffer.ByteBuf;
+
 // TODO: Implement this
 public class OpenwireMessage implements Message {
 
@@ -496,4 +498,9 @@ public class OpenwireMessage implements Message {
    public int getMemoryEstimate() {
       return 0;
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return 0;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index cefcbf9..e678ab8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -227,6 +227,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
+   public long getPersistentSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getPersistentSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableMessageCount() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableMessageCount();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurablePersistentSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurablePersistentSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public int getConsumerCount() {
       checkStarted();
 
@@ -251,6 +287,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
+   public long getDeliveringSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDeliveringSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public int getDurableDeliveringCount() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableDeliveringCount();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableDeliveringSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableDeliveringSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public long getMessagesAdded() {
       checkStarted();
 
@@ -323,6 +395,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
+   public long getScheduledSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getScheduledSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableScheduledCount() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableScheduledCount();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableScheduledSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableScheduledSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public String getDeadLetterAddress() {
       checkStarted();
 
@@ -998,7 +1106,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       try {
          long index = 0;
          long start = (page - 1) * pageSize;
-         long end = Math.min((long)(page * pageSize), queue.getMessageCount());
+         long end = Math.min(page * pageSize, queue.getMessageCount());
 
          ArrayList<CompositeData> c = new ArrayList<>();
          Filter thefilter = FilterImpl.createFilter(filter);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
index 0124f09..5b39691 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.paging;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -38,4 +39,13 @@ public interface PagedMessage extends EncodingSupport {
    void initMessage(StorageManager storageManager);
 
    long getTransactionID();
+
+   /**
+    * This is the size of the message when persisted on disk and is used for metrics tracking
+    * If a normal message it will be the encoded message size
+    * If a large message it will be encoded message size + large message body size
+    * @return
+    * @throws ActiveMQException
+    */
+   long getPersistentSize() throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
index 00955b7..a9e0537 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
@@ -28,6 +28,10 @@ public interface PagePosition extends Comparable<PagePosition> {
 
    int getMessageNr();
 
+   long getPersistentSize();
+
+   void setPersistentSize(long persistentSize);
+
    PagePosition nextMessage();
 
    PagePosition nextPage();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 985f563..b11362d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -44,6 +44,8 @@ public interface PageSubscription {
 
    long getMessageCount();
 
+   long getPersistentSize();
+
    long getId();
 
    boolean isPersistent();
@@ -161,4 +163,11 @@ public interface PageSubscription {
     * @throws Exception
     */
    void onDeletePage(Page deletedPage) throws Exception;
+
+   long getDeliveredCount();
+
+   long getDeliveredSize();
+
+   void incrementDeliveredSize(long size);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
index 37cdb3b..33b744f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
@@ -26,13 +26,17 @@ public interface PageSubscriptionCounter {
 
    long getValue();
 
-   void increment(Transaction tx, int add) throws Exception;
+   long getPersistentSizeAdded();
 
-   void loadValue(long recordValueID, long value);
+   long getPersistentSize();
 
-   void loadInc(long recordInd, int add);
+   void increment(Transaction tx, int add, long persistentSize) throws Exception;
 
-   void applyIncrementOnTX(Transaction tx, long recordID, int add);
+   void loadValue(long recordValueID, long value, long persistentSize);
+
+   void loadInc(long recordInd, int add, long persistentSize);
+
+   void applyIncrementOnTX(Transaction tx, long recordID, int add, long persistentSize);
 
    /**
     * This will process the reload
@@ -43,12 +47,12 @@ public interface PageSubscriptionCounter {
     * @param id
     * @param variance
     */
-   void addInc(long id, int variance);
+   void addInc(long id, int variance, long size);
 
    // used when deleting the counter
    void delete() throws Exception;
 
-   void pendingCounter(Page page, int increment) throws Exception;
+   void pendingCounter(Page page, int increment, long persistentSize) throws Exception;
 
    // used when leaving page mode, so the counters are deleted in batches
    // for each queue on the address

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 42c5423..f5d49cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -59,6 +59,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    private final long messageID;
 
+   private long messageSize = -1;
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -104,6 +106,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
       this.largeMessage = message.getMessage().isLargeMessage();
       this.transactionID = message.getTransactionID();
       this.messageID = message.getMessage().getMessageID();
+
+      //pre-cache the message size so we don't have to reload the message later if it is GC'd
+      getPersistentSize();
    }
 
    @Override
@@ -191,7 +196,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    @Override
    public void handled() {
-      getQueue().referenceHandled();
+      getQueue().referenceHandled(this);
    }
 
    @Override
@@ -280,4 +285,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
       return messageID;
    }
 
+   @Override
+   public long getPersistentSize() {
+      if (messageSize == -1) {
+         try {
+            messageSize = getPagedMessage().getPersistentSize();
+         } catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
+         }
+      }
+      return messageSize;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
index 076f872..52d1c83 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
@@ -37,6 +37,12 @@ public class PagePositionImpl implements PagePosition {
    private long recordID = -1;
 
    /**
+    * Optional size value that can be set to specify the peristent size of the message
+    * for metrics tracking purposes
+    */
+   private long persistentSize;
+
+   /**
     * @param pageNr
     * @param messageNr
     */
@@ -82,6 +88,22 @@ public class PagePositionImpl implements PagePosition {
       return messageNr;
    }
 
+   /**
+    * @return the persistentSize
+    */
+   @Override
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
+   /**
+    * @param persistentSize the persistentSize to set
+    */
+   @Override
+   public void setPersistentSize(long persistentSize) {
+      this.persistentSize = persistentSize;
+   }
+
    @Override
    public int compareTo(PagePosition o) {
       if (pageNr > o.getPageNr()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 01ad778..3bb56f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -21,10 +21,10 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -60,10 +60,13 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    private final Executor executor;
 
    private final AtomicLong value = new AtomicLong(0);
+   private final AtomicLong persistentSize = new AtomicLong(0);
 
    private final AtomicLong added = new AtomicLong(0);
+   private final AtomicLong addedPersistentSize = new AtomicLong(0);
 
    private final AtomicLong pendingValue = new AtomicLong(0);
+   private final AtomicLong pendingPersistentSize = new AtomicLong(0);
 
    private final LinkedList<Long> incrementRecords = new LinkedList<>();
 
@@ -71,9 +74,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    // we will recount a page case we still see pending records
    // as soon as we close a page we remove these records replacing by a regular page increment record
    // A Map per pageID, each page will have a set of IDs, with the increment on each one
-   private final Map<Long, Pair<Long, AtomicInteger>> pendingCounters = new HashMap<>();
+   private final Map<Long, PendingCounter> pendingCounters = new HashMap<>();
 
-   private LinkedList<Pair<Long, Integer>> loadList;
+   private LinkedList<PendingCounter> loadList;
 
    private final Runnable cleanupCheck = new Runnable() {
       @Override
@@ -104,6 +107,16 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       return value.get() + pendingValue.get();
    }
 
+   @Override
+   public long getPersistentSizeAdded() {
+      return addedPersistentSize.get() + pendingPersistentSize.get();
+   }
+
+   @Override
+   public long getPersistentSize() {
+      return persistentSize.get() + pendingPersistentSize.get();
+   }
+
    /**
     * This is used only on non transactional paging
     *
@@ -112,24 +125,25 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
     * @throws Exception
     */
    @Override
-   public synchronized void pendingCounter(Page page, int increment) throws Exception {
+   public synchronized void pendingCounter(Page page, int increment, long size) throws Exception {
       if (!persistent) {
          return; // nothing to be done
       }
 
-      Pair<Long, AtomicInteger> pendingInfo = pendingCounters.get((long) page.getPageId());
+      PendingCounter pendingInfo = pendingCounters.get((long) page.getPageId());
       if (pendingInfo == null) {
          // We have to make sure this is sync here
          // not syncing this to disk may cause the page files to be out of sync on pages.
          // we can't afford the case where a page file is written without a record here
          long id = storage.storePendingCounter(this.subscriptionID, page.getPageId(), increment);
-         pendingInfo = new Pair<>(id, new AtomicInteger(1));
+         pendingInfo = new PendingCounter(id, increment, size);
          pendingCounters.put((long) page.getPageId(), pendingInfo);
       } else {
-         pendingInfo.getB().addAndGet(increment);
+         pendingInfo.addAndGet(increment, size);
       }
 
       pendingValue.addAndGet(increment);
+      pendingPersistentSize.addAndGet(size);
 
       page.addPendingCounter(this);
    }
@@ -141,23 +155,25 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
     */
    @Override
    public void cleanupNonTXCounters(final long pageID) throws Exception {
-      Pair<Long, AtomicInteger> pendingInfo;
+      PendingCounter pendingInfo;
       synchronized (this) {
          pendingInfo = pendingCounters.remove(pageID);
       }
 
       if (pendingInfo != null) {
-         final AtomicInteger valueCleaned = pendingInfo.getB();
+         final int valueCleaned = pendingInfo.getCount();
+         final long valueSizeCleaned = pendingInfo.getPersistentSize();
          Transaction tx = new TransactionImpl(storage);
-         storage.deletePendingPageCounter(tx.getID(), pendingInfo.getA());
+         storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId());
 
          // To apply the increment of the value just being cleaned
-         increment(tx, valueCleaned.get());
+         increment(tx, valueCleaned, valueSizeCleaned);
 
          tx.addOperation(new TransactionOperationAbstract() {
             @Override
             public void afterCommit(Transaction tx) {
-               pendingValue.addAndGet(-valueCleaned.get());
+               pendingValue.addAndGet(-valueCleaned);
+               pendingPersistentSize.updateAndGet(val -> val >= valueSizeCleaned ? val - valueSizeCleaned : 0);
             }
          });
 
@@ -166,21 +182,21 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    }
 
    @Override
-   public void increment(Transaction tx, int add) throws Exception {
+   public void increment(Transaction tx, int add, long size) throws Exception {
       if (tx == null) {
          if (persistent) {
-            long id = storage.storePageCounterInc(this.subscriptionID, add);
-            incrementProcessed(id, add);
+            long id = storage.storePageCounterInc(this.subscriptionID, add, size);
+            incrementProcessed(id, add, size);
          } else {
-            incrementProcessed(-1, add);
+            incrementProcessed(-1, add, size);
          }
       } else {
          if (persistent) {
             tx.setContainsPersistent();
-            long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-            applyIncrementOnTX(tx, id, add);
+            long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add, size);
+            applyIncrementOnTX(tx, id, add, size);
          } else {
-            applyIncrementOnTX(tx, -1, add);
+            applyIncrementOnTX(tx, -1, add, size);
          }
       }
    }
@@ -193,7 +209,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
     * @param add
     */
    @Override
-   public void applyIncrementOnTX(Transaction tx, long recordID1, int add) {
+   public void applyIncrementOnTX(Transaction tx, long recordID1, int add, long size) {
       CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
 
       if (oper == null) {
@@ -202,22 +218,24 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
          tx.addOperation(oper);
       }
 
-      oper.operations.add(new ItemOper(this, recordID1, add));
+      oper.operations.add(new ItemOper(this, recordID1, add, size));
    }
 
    @Override
-   public synchronized void loadValue(final long recordID1, final long value1) {
+   public synchronized void loadValue(final long recordID1, final long value1, long size) {
       if (this.subscription != null) {
          // it could be null on testcases... which is ok
          this.subscription.notEmpty();
       }
       this.value.set(value1);
       this.added.set(value1);
+      this.persistentSize.set(size);
+      this.addedPersistentSize.set(size);
       this.recordID = recordID1;
    }
 
-   public synchronized void incrementProcessed(long id, int add) {
-      addInc(id, add);
+   public synchronized void incrementProcessed(long id, int add, long size) {
+      addInc(id, add, size);
       if (incrementRecords.size() > FLUSH_COUNTER) {
          executor.execute(cleanupCheck);
       }
@@ -259,12 +277,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    }
 
    @Override
-   public void loadInc(long id, int add) {
+   public void loadInc(long id, int add, long size) {
       if (loadList == null) {
          loadList = new LinkedList<>();
       }
 
-      loadList.add(new Pair<>(id, add));
+      loadList.add(new PendingCounter(id, add, size));
    }
 
    @Override
@@ -275,10 +293,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
             subscription.notEmpty();
          }
 
-         for (Pair<Long, Integer> incElement : loadList) {
-            value.addAndGet(incElement.getB());
-            added.addAndGet(incElement.getB());
-            incrementRecords.add(incElement.getA());
+         for (PendingCounter incElement : loadList) {
+            value.addAndGet(incElement.getCount());
+            added.addAndGet(incElement.getCount());
+            persistentSize.addAndGet(incElement.getPersistentSize());
+            addedPersistentSize.addAndGet(incElement.getPersistentSize());
+            incrementRecords.add(incElement.getId());
          }
          loadList.clear();
          loadList = null;
@@ -286,11 +306,15 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
    }
 
    @Override
-   public synchronized void addInc(long id, int variance) {
+   public synchronized void addInc(long id, int variance, long size) {
       value.addAndGet(variance);
+      this.persistentSize.addAndGet(size);
       if (variance > 0) {
          added.addAndGet(variance);
       }
+      if (size > 0) {
+         addedPersistentSize.addAndGet(size);
+      }
       if (id >= 0) {
          incrementRecords.add(id);
       }
@@ -310,11 +334,13 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       ArrayList<Long> deleteList;
 
       long valueReplace;
+      long sizeReplace;
       synchronized (this) {
          if (incrementRecords.size() <= FLUSH_COUNTER) {
             return;
          }
          valueReplace = value.get();
+         sizeReplace = persistentSize.get();
          deleteList = new ArrayList<>(incrementRecords);
          incrementRecords.clear();
       }
@@ -332,7 +358,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
             storage.deletePageCounter(txCleanup, recordID);
          }
 
-         newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace);
+         newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace);
 
          if (logger.isTraceEnabled()) {
             logger.trace("Replacing page-counter record = " + recordID + " by record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " for queue = " + this.subscription.getQueue().getName());
@@ -354,10 +380,11 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
 
    private static class ItemOper {
 
-      private ItemOper(PageSubscriptionCounterImpl counter, long id, int add) {
+      private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, long persistentSize) {
          this.counter = counter;
          this.id = id;
          this.amount = add;
+         this.persistentSize = persistentSize;
       }
 
       PageSubscriptionCounterImpl counter;
@@ -365,6 +392,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       long id;
 
       int amount;
+
+      long persistentSize;
    }
 
    private static class CounterOperations extends TransactionOperationAbstract implements TransactionOperation {
@@ -374,8 +403,55 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
       @Override
       public void afterCommit(Transaction tx) {
          for (ItemOper oper : operations) {
-            oper.counter.incrementProcessed(oper.id, oper.amount);
+            oper.counter.incrementProcessed(oper.id, oper.amount, oper.persistentSize);
          }
       }
    }
+
+   private static class PendingCounter {
+      private static final AtomicIntegerFieldUpdater<PendingCounter> COUNT_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PendingCounter.class, "count");
+
+      private static final AtomicLongFieldUpdater<PendingCounter> SIZE_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(PendingCounter.class, "persistentSize");
+
+      private final long id;
+      private volatile int count;
+      private volatile long persistentSize;
+
+      /**
+       * @param id
+       * @param count
+       * @param size
+       */
+      PendingCounter(long id, int count, long persistentSize) {
+         super();
+         this.id = id;
+         this.count = count;
+         this.persistentSize = persistentSize;
+      }
+      /**
+       * @return the id
+       */
+      public long getId() {
+         return id;
+      }
+      /**
+       * @return the count
+       */
+      public int getCount() {
+         return count;
+      }
+      /**
+       * @return the size
+       */
+      public long getPersistentSize() {
+         return persistentSize;
+      }
+
+      public void addAndGet(int count, long persistentSize) {
+         COUNT_UPDATER.addAndGet(this, count);
+         SIZE_UPDATER.addAndGet(this, persistentSize);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 24c69be..924aace 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -96,6 +96,8 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    private final AtomicLong deliveredCount = new AtomicLong(0);
 
+   private final AtomicLong deliveredSize = new AtomicLong(0);
+
    PageSubscriptionImpl(final PageCursorProvider cursorProvider,
                         final PagingStore pageStore,
                         final StorageManager store,
@@ -178,6 +180,18 @@ final class PageSubscriptionImpl implements PageSubscription {
    }
 
    @Override
+   public long getPersistentSize() {
+      if (empty) {
+         return 0;
+      } else {
+         //A negative value could happen if an old journal was loaded that didn't have
+         //size metrics for old records
+         long messageSize = counter.getPersistentSize() - deliveredSize.get();
+         return messageSize > 0 ? messageSize : 0;
+      }
+   }
+
+   @Override
    public PageSubscriptionCounter getCounter() {
       return counter;
    }
@@ -439,7 +453,7 @@ final class PageSubscriptionImpl implements PageSubscription {
    public void ackTx(final Transaction tx, final PagedReference reference) throws Exception {
       confirmPosition(tx, reference.getPosition());
 
-      counter.increment(tx, -1);
+      counter.increment(tx, -1, -getPersistentSize(reference));
 
       PageTransactionInfo txInfo = getPageTransaction(reference);
       if (txInfo != null) {
@@ -831,6 +845,12 @@ final class PageSubscriptionImpl implements PageSubscription {
       }
 
       PageCursorInfo info = getPageInfo(position);
+      PageCache cache = info.getCache();
+      long size = 0;
+      if (cache != null) {
+         size = getPersistentSize(cache.getMessage(position.getMessageNr()));
+         position.setPersistentSize(size);
+      }
 
       logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", position, info);
 
@@ -1060,6 +1080,13 @@ final class PageSubscriptionImpl implements PageSubscription {
          }
       }
 
+      /**
+       * @return the cache
+       */
+      public PageCache getCache() {
+         return cache != null ? cache.get() : null;
+      }
+
    }
 
    private final class PageCursorTX extends TransactionOperationAbstract {
@@ -1087,6 +1114,7 @@ final class PageSubscriptionImpl implements PageSubscription {
             for (PagePosition confirmed : positions) {
                cursor.processACK(confirmed);
                cursor.deliveredCount.decrementAndGet();
+               cursor.deliveredSize.addAndGet(-confirmed.getPersistentSize());
             }
 
          }
@@ -1309,4 +1337,43 @@ final class PageSubscriptionImpl implements PageSubscription {
       public void close() {
       }
    }
+
+   /**
+    * @return the deliveredCount
+    */
+   @Override
+   public long getDeliveredCount() {
+      return deliveredCount.get();
+   }
+
+   /**
+    * @return the deliveredSize
+    */
+   @Override
+   public long getDeliveredSize() {
+      return deliveredSize.get();
+   }
+
+   @Override
+   public void incrementDeliveredSize(long size) {
+      deliveredSize.addAndGet(size);
+   }
+
+   private long getPersistentSize(PagedMessage msg) {
+      try {
+         return msg != null && msg.getPersistentSize() > 0 ? msg.getPersistentSize() : 0;
+      } catch (ActiveMQException e) {
+         logger.warn("Error computing persistent size of message: " + msg, e);
+         return 0;
+      }
+   }
+
+   private long getPersistentSize(PagedReference ref) {
+      try {
+         return ref != null && ref.getPersistentSize() > 0 ? ref.getPersistentSize() : 0;
+      } catch (ActiveMQException e) {
+         logger.warn("Error computing persistent size of message: " + ref, e);
+         return 0;
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index d7bd05c..3ef833d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -169,4 +170,9 @@ public class PagedMessageImpl implements PagedMessage {
          message +
          "]";
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return message.getPersistentSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f1beb31..0eec5a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -840,7 +840,8 @@ public class PagingStoreImpl implements PagingStore {
             // the apply counter will make sure we write a record on journal
             // especially on the case for non transactional sends and paging
             // doing this will give us a possibility of recovering the page counters
-            applyPageCounters(tx, getCurrentPage(), listCtx);
+            long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
+            applyPageCounters(tx, getCurrentPage(), listCtx, persistentSize);
 
             currentPage.write(pagedMessage);
 
@@ -906,22 +907,22 @@ public class PagingStoreImpl implements PagingStore {
     * @param ctx
     * @throws Exception
     */
-   private void applyPageCounters(Transaction tx, Page page, RouteContextList ctx) throws Exception {
+   private void applyPageCounters(Transaction tx, Page page, RouteContextList ctx, long size) throws Exception {
       List<org.apache.activemq.artemis.core.server.Queue> durableQueues = ctx.getDurableQueues();
       List<org.apache.activemq.artemis.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
       for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) {
          if (tx == null) {
             // non transactional writes need an intermediate place
             // to avoid the counter getting out of sync
-            q.getPageSubscription().getCounter().pendingCounter(page, 1);
+            q.getPageSubscription().getCounter().pendingCounter(page, 1, size);
          } else {
             // null tx is treated through pending counters
-            q.getPageSubscription().getCounter().increment(tx, 1);
+            q.getPageSubscription().getCounter().increment(tx, 1, size);
          }
       }
 
       for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) {
-         q.getPageSubscription().getCounter().increment(tx, 1);
+         q.getPageSubscription().getCounter().increment(tx, 1, size);
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 6defb1e..f9793d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -336,7 +336,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
    /**
     * @return The ID with the stored counter
     */
-   long storePageCounter(long txID, long queueID, long value) throws Exception;
+   long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception;
 
    long storePendingCounter(long queueID, long pageID, int inc) throws Exception;
 
@@ -350,13 +350,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     * @return the ID with the increment record
     * @throws Exception
     */
-   long storePageCounterInc(long txID, long queueID, int add) throws Exception;
+   long storePageCounterInc(long txID, long queueID, int add, long persistentSize) throws Exception;
 
    /**
     * @return the ID with the increment record
     * @throws Exception
     */
-   long storePageCounterInc(long queueID, int add) throws Exception;
+   long storePageCounterInc(long queueID, int add, long size) throws Exception;
 
    /**
     * @return the bindings journal

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 34d249e..ada5b90 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -16,7 +16,13 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.security.DigestInputStream;
@@ -37,6 +43,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Message;
@@ -109,13 +117,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.jboss.logging.Logger;
 
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
-
 /**
  * Controls access to the journals and other storage files such as the ones used to store pages and
  * large messages.  This class must control writing of any non-transient data, as it is the key point
@@ -1084,7 +1085,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().loadValue(record.id, encoding.getValue());
+                     sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
                   } else {
                      ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
                      messageJournal.appendDeleteRecord(record.id, false);
@@ -1101,7 +1102,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().loadInc(record.id, encoding.getValue());
+                     sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize());
                   } else {
                      ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
                      messageJournal.appendDeleteRecord(record.id, false);
@@ -1136,6 +1137,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: {
 
                   PageCountPendingImpl pendingCountEncoding = new PageCountPendingImpl();
+
                   pendingCountEncoding.decode(buff);
                   pendingCountEncoding.setID(record.id);
 
@@ -1143,6 +1145,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   if (pendingNonTXPageCounter != null) {
                      pendingNonTXPageCounter.add(pendingCountEncoding);
                   }
+
                   break;
                }
 
@@ -1349,11 +1352,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    }
 
    @Override
-   public long storePageCounterInc(long txID, long queueID, int value) throws Exception {
+   public long storePageCounterInc(long txID, long queueID, int value, long persistentSize) throws Exception {
       readLock();
       try {
          long recordID = idGenerator.generateID();
-         messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value));
+         messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize));
          return recordID;
       } finally {
          readUnLock();
@@ -1361,11 +1364,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    }
 
    @Override
-   public long storePageCounterInc(long queueID, int value) throws Exception {
+   public long storePageCounterInc(long queueID, int value, long persistentSize) throws Exception {
       readLock();
       try {
          final long recordID = idGenerator.generateID();
-         messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value), true, getContext());
+         messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize), true, getContext());
          return recordID;
       } finally {
          readUnLock();
@@ -1373,11 +1376,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
    }
 
    @Override
-   public long storePageCounter(long txID, long queueID, long value) throws Exception {
+   public long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception {
       readLock();
       try {
          final long recordID = idGenerator.generateID();
-         messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value));
+         messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value, persistentSize));
          return recordID;
       } finally {
          readUnLock();
@@ -1789,7 +1792,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue());
+                     sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize());
                      sub.notEmpty();
                   } else {
                      ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index bfabc25..6cd417b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -16,7 +16,29 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
+
 import java.io.File;
 import java.io.PrintStream;
 import java.util.HashMap;
@@ -24,6 +46,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Message;
@@ -58,29 +82,6 @@ import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.XidCodecSupport;
 
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
-
 
 /**
  * Outputs a String description of the Journals contents.
@@ -217,9 +218,9 @@ public final class DescribeJournal {
                      out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
                   }
 
-                  subsCounter.loadValue(info.id, encoding.getValue());
+                  subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize());
                   subsCounter.processReload();
-                  out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue());
+                  out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + ", result=" + subsCounter.getValue());
                   if (subsCounter.getValue() < 0) {
                      out.println(" #NegativeCounter!!!!");
                   } else {
@@ -232,9 +233,9 @@ public final class DescribeJournal {
 
                   PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
 
-                  subsCounter.loadInc(info.id, encoding.getValue());
+                  subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize());
                   subsCounter.processReload();
-                  out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue());
+                  out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + " increased by " + encoding.getValue());
                   if (subsCounter.getValue() < 0) {
                      out.println(" #NegativeCounter!!!!");
                   } else {
@@ -321,7 +322,7 @@ public final class DescribeJournal {
 
             subsCounter = lookupCounter(counters, queueIDForCounter);
 
-            subsCounter.loadValue(info.id, encoding.getValue());
+            subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize());
             subsCounter.processReload();
          } else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
             PageCountRecordInc encoding = (PageCountRecordInc) o;
@@ -329,7 +330,7 @@ public final class DescribeJournal {
 
             subsCounter = lookupCounter(counters, queueIDForCounter);
 
-            subsCounter.loadInc(info.id, encoding.getValue());
+            subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize());
             subsCounter.processReload();
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 433031c..dabb039 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -35,6 +34,8 @@ import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
+import io.netty.buffer.Unpooled;
+
 public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage {
 
    // Constants -----------------------------------------------------
@@ -345,6 +346,13 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
    }
 
    @Override
+   public long getPersistentSize() throws ActiveMQException {
+      long size = super.getPersistentSize();
+      size += getBodyEncoder().getLargeBodySize();
+
+      return size;
+   }
+   @Override
    public String toString() {
       return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
          ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
index 56e8c87..e600d46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
@@ -23,9 +23,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
 
 public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
 
+
    @Override
    public String toString() {
-      return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + "]";
+      return "PageCountPendingImpl [queueID=" + queueID + ", pageID=" + pageID + "]";
    }
 
    public PageCountPendingImpl() {
@@ -64,7 +65,7 @@ public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
 
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG * 2;
+      return DataConstants.SIZE_LONG * 3;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
index 642feb2..af9e135 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
@@ -26,18 +26,21 @@ public class PageCountRecord implements EncodingSupport {
 
    private long value;
 
+   private long persistentSize;
+
    @Override
    public String toString() {
-      return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
+      return "PageCountRecord [queueID=" + queueID + ", value=" + value + ", persistentSize=" + persistentSize + "]";
    }
 
    public PageCountRecord() {
 
    }
 
-   public PageCountRecord(long queueID, long value) {
+   public PageCountRecord(long queueID, long value, long persistentSize) {
       this.queueID = queueID;
       this.value = value;
+      this.persistentSize = persistentSize;
    }
 
    public long getQueueID() {
@@ -48,21 +51,30 @@ public class PageCountRecord implements EncodingSupport {
       return value;
    }
 
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG * 2;
+      return DataConstants.SIZE_LONG * 3;
    }
 
    @Override
    public void encode(ActiveMQBuffer buffer) {
       buffer.writeLong(queueID);
       buffer.writeLong(value);
+      buffer.writeLong(persistentSize);
    }
 
    @Override
    public void decode(ActiveMQBuffer buffer) {
       queueID = buffer.readLong();
       value = buffer.readLong();
+
+      if (buffer.readableBytes() > 0) {
+         persistentSize = buffer.readLong();
+      }
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
index e427d68..c174155 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
@@ -26,17 +26,20 @@ public class PageCountRecordInc implements EncodingSupport {
 
    private int value;
 
+   private long persistentSize;
+
    @Override
    public String toString() {
-      return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
+      return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + ", persistentSize=" + persistentSize + "]";
    }
 
    public PageCountRecordInc() {
    }
 
-   public PageCountRecordInc(long queueID, int value) {
+   public PageCountRecordInc(long queueID, int value, long persistentSize) {
       this.queueID = queueID;
       this.value = value;
+      this.persistentSize = persistentSize;
    }
 
    public long getQueueID() {
@@ -47,21 +50,30 @@ public class PageCountRecordInc implements EncodingSupport {
       return value;
    }
 
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+      return 2 * DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
    }
 
    @Override
    public void encode(ActiveMQBuffer buffer) {
       buffer.writeLong(queueID);
       buffer.writeInt(value);
+      buffer.writeLong(persistentSize);
    }
 
    @Override
    public void decode(ActiveMQBuffer buffer) {
       queueID = buffer.readLong();
       value = buffer.readInt();
+
+      if (buffer.readableBytes() > 0) {
+         persistentSize = buffer.readLong();
+      }
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 32f9010..8c5e11c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -467,7 +467,7 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public long storePageCounter(final long txID, final long queueID, final long value) throws Exception {
+   public long storePageCounter(final long txID, final long queueID, final long value, final long size) throws Exception {
       return 0;
    }
 
@@ -489,12 +489,12 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public long storePageCounterInc(final long txID, final long queueID, final int add) throws Exception {
+   public long storePageCounterInc(final long txID, final long queueID, final int add, final long size) throws Exception {
       return 0;
    }
 
    @Override
-   public long storePageCounterInc(final long queueID, final int add) throws Exception {
+   public long storePageCounterInc(final long queueID, final int add, final long size) throws Exception {
       return 0;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 12c722a..73d6953 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1480,7 +1480,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       public void afterPrepare(final Transaction tx) {
          for (MessageReference ref : refs) {
             if (ref.isAlreadyAcked()) {
-               ref.getQueue().referenceHandled();
+               ref.getQueue().referenceHandled(ref);
                ref.getQueue().incrementMesssagesAdded();
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 7ae1ee4..10c827e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1911,4 +1911,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224088, value = "Timeout ({0} seconds) while handshaking has occurred.", format = Message.Format.MESSAGE_FORMAT)
    void handshakeTimeout(int timeout);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 224089, value = "Failed to calculate persistent size", format = Message.Format.MESSAGE_FORMAT)
+   void errorCalculatePersistentSize(@Cause Throwable e);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 906ea7e..d9145b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server;
 
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@@ -99,4 +100,14 @@ public interface MessageReference {
    void setAlreadyAcked();
 
    boolean isAlreadyAcked();
+
+   /**
+    * This is the size of the message when persisted on disk which is used for metrics tracking
+    * Note that even if the message itself is not persisted on disk (ie non-durable) this value is
+    * still used for metrics tracking for the amount of data on a queue
+    *
+    * @return
+    * @throws ActiveMQException
+    */
+   long getPersistentSize() throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ff4e82b..c355dbf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -138,12 +138,42 @@ public interface Queue extends Bindable,CriticalComponent {
 
    long getMessageCount();
 
+   /**
+    * This is the size of the messages in the queue when persisted on disk which is used for metrics tracking
+    * to give an idea of the amount of data on the queue to be consumed
+    *
+    * Note that this includes all messages on the queue, even messages that are non-durable which may only be in memory
+    */
+   long getPersistentSize();
+
+   /**
+    * This is the number of the durable messages in the queue
+    */
+   long getDurableMessageCount();
+
+   /**
+    * This is the persistent size of all the durable messages in the queue
+    */
+   long getDurablePersistentSize();
+
    int getDeliveringCount();
 
-   void referenceHandled();
+   long getDeliveringSize();
+
+   int getDurableDeliveringCount();
+
+   long getDurableDeliveringSize();
+
+   void referenceHandled(MessageReference ref);
 
    int getScheduledCount();
 
+   long getScheduledSize();
+
+   int getDurableScheduledCount();
+
+   long getDurableScheduledSize();
+
    List<MessageReference> getScheduledMessages();
 
    /**
@@ -314,8 +344,6 @@ public interface Queue extends Bindable,CriticalComponent {
     */
    SimpleString getUser();
 
-   void decDelivering(int size);
-
    /** This is to perform a check on the counter again */
    void recheckRefCount(OperationContext context);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
index 62fad5e..1dc2eda 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
@@ -27,6 +27,12 @@ public interface ScheduledDeliveryHandler {
 
    int getScheduledCount();
 
+   long getScheduledSize();
+
+   int getDurableScheduledCount();
+
+   long getDurableScheduledSize();
+
    List<MessageReference> getScheduledReferences();
 
    List<MessageReference> cancel(Filter filter) throws ActiveMQException;

Reply | Threaded
Open this post in threaded view
|

[5/6] activemq-artemis git commit: ARTEMIS-1663 Tweak on Paging test

clebertsuconic-2
In reply to this post by clebertsuconic-2
ARTEMIS-1663 Tweak on Paging test

Making sure it JournalCompatibilityTest/paging is actually paging


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/58c46604
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/58c46604
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/58c46604

Branch: refs/heads/master
Commit: 58c4660426eceabecfaecaad42716562d5e2fd92
Parents: ea70af1
Author: Clebert Suconic <[hidden email]>
Authored: Thu Feb 8 11:45:14 2018 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Feb 8 11:45:18 2018 -0500

----------------------------------------------------------------------
 .../journalcompatibility/forcepaging.groovy     | 23 ++++++++++++++++++++
 .../journalcompatibility/ispaging.groovy        | 23 ++++++++++++++++++++
 .../main/resources/servers/artemisServer.groovy |  5 +++--
 .../compatibility/JournalCompatibilityTest.java |  3 +++
 4 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58c46604/tests/compatibility-tests/src/main/resources/journalcompatibility/forcepaging.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/forcepaging.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/forcepaging.groovy
new file mode 100644
index 0000000..032bcc1
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/forcepaging.groovy
@@ -0,0 +1,23 @@
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
+queue.getPageSubscription().getPagingStore().startPaging();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58c46604/tests/compatibility-tests/src/main/resources/journalcompatibility/ispaging.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/journalcompatibility/ispaging.groovy b/tests/compatibility-tests/src/main/resources/journalcompatibility/ispaging.groovy
new file mode 100644
index 0000000..a6dea7d
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/journalcompatibility/ispaging.groovy
@@ -0,0 +1,23 @@
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.server.Queue
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+Queue queue = server.getJMSServerManager().getActiveMQServer().locateQueue(SimpleString.toSimpleString("queue"))
+GroovyRun.assertTrue(queue.getPageSubscription().getPagingStore().isPaging())

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58c46604/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
index 78d1241..913c971 100644
--- a/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
+++ b/tests/compatibility-tests/src/main/resources/servers/artemisServer.groovy
@@ -19,7 +19,8 @@ package servers
 // starts an artemis server
 
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
-import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.JournalType
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
 import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
@@ -47,7 +48,7 @@ try {
     if (!type.startsWith("ARTEMIS-1")) {
         configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true));
         if (globalMaxSize != null) {
-            configuration.getAddressesSettings().get("#").setPageSizeBytes(globalMaxSize);
+            configuration.getAddressesSettings().get("#").setPageSizeBytes(globalMaxSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE)
             configuration.setGlobalMaxSize(Long.parseLong(globalMaxSize));
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58c46604/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
index 958db27..171e721 100644
--- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java
@@ -133,11 +133,14 @@ public class JournalCompatibilityTest extends VersionedBaseTest {
       setVariable(senderClassloader, "persistent", true);
       //Set max size to 1 to cause messages to immediately go to the paging store
       startServer(serverFolder.getRoot(), senderClassloader, "journalTest", Long.toString(1));
+      evaluate(senderClassloader, "journalcompatibility/forcepaging.groovy");
       evaluate(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
+      evaluate(senderClassloader, "journalcompatibility/ispaging.groovy");
       stopServer(senderClassloader);
 
       setVariable(receiverClassloader, "persistent", true);
       startServer(serverFolder.getRoot(), receiverClassloader, "journalTest", Long.toString(1));
+      evaluate(receiverClassloader, "journalcompatibility/ispaging.groovy");
 
 
       evaluate(receiverClassloader, "metrics/queueMetrics.groovy", server, receiver, "receiveMessages");

Reply | Threaded
Open this post in threaded view
|

[6/6] activemq-artemis git commit: This closes #1853

clebertsuconic-2
In reply to this post by clebertsuconic-2
This closes #1853


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4ef6e328
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4ef6e328
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4ef6e328

Branch: refs/heads/master
Commit: 4ef6e3281dc695258f14a95ea76e1be2e8df62ea
Parents: 2eac195 58c4660
Author: Clebert Suconic <[hidden email]>
Authored: Thu Feb 8 12:10:11 2018 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Feb 8 12:10:11 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  11 +-
 .../api/core/management/QueueControl.java       |  58 ++
 .../artemis/core/message/impl/CoreMessage.java  |   5 +
 .../core/message/impl/MessageInternalImpl.java  |   5 +
 .../artemis/core/journal/impl/JournalImpl.java  |   6 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  13 +-
 .../core/protocol/openwire/OpenwireMessage.java |   9 +-
 .../core/management/impl/QueueControlImpl.java  | 110 +++-
 .../artemis/core/paging/PagedMessage.java       |  10 +
 .../core/paging/cursor/PagePosition.java        |   4 +
 .../core/paging/cursor/PageSubscription.java    |   9 +
 .../paging/cursor/PageSubscriptionCounter.java  |  16 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |  19 +-
 .../paging/cursor/impl/PagePositionImpl.java    |  22 +
 .../impl/PageSubscriptionCounterImpl.java       | 146 ++++-
 .../cursor/impl/PageSubscriptionImpl.java       |  69 +-
 .../core/paging/impl/PagedMessageImpl.java      |   6 +
 .../core/paging/impl/PagingStoreImpl.java       |  11 +-
 .../core/persistence/StorageManager.java        |   6 +-
 .../journal/AbstractJournalStorageManager.java  |  37 +-
 .../impl/journal/DescribeJournal.java           |  61 +-
 .../impl/journal/LargeServerMessageImpl.java    |  10 +-
 .../journal/codec/PageCountPendingImpl.java     |   5 +-
 .../impl/journal/codec/PageCountRecord.java     |  18 +-
 .../impl/journal/codec/PageCountRecordInc.java  |  18 +-
 .../impl/nullpm/NullStorageManager.java         |   6 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   2 +-
 .../core/server/ActiveMQServerLogger.java       |   4 +
 .../artemis/core/server/MessageReference.java   |  11 +
 .../activemq/artemis/core/server/Queue.java     |  34 +-
 .../core/server/ScheduledDeliveryHandler.java   |   6 +
 .../core/server/cluster/impl/BridgeImpl.java    |   2 +-
 .../core/server/impl/LastValueQueue.java        |  10 +-
 .../core/server/impl/MessageReferenceImpl.java  |   8 +-
 .../server/impl/PostOfficeJournalLoader.java    |  13 +-
 .../artemis/core/server/impl/QueueImpl.java     | 144 +++-
 .../server/impl/QueuePendingMessageMetrics.java | 147 +++++
 .../impl/ScheduledDeliveryHandlerImpl.java      |  29 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  75 ++-
 .../transaction/impl/TransactionImplTest.java   |   6 +-
 .../journalcompatibility/forcepaging.groovy     |  23 +
 .../journalcompatibility/ispaging.groovy        |  23 +
 .../main/resources/metrics/queueMetrics.groovy  |  37 ++
 .../main/resources/servers/artemisServer.groovy |   9 +-
 .../compatibility/JournalCompatibilityTest.java |  46 +-
 .../tests/compatibility/VersionedBaseTest.java  |   7 +-
 .../integration/client/AcknowledgeTest.java     |   5 +
 .../integration/client/SendAckFailTest.java     |  12 +-
 .../management/ManagementTestBase.java          |  15 +
 .../management/QueueControlTest.java            | 452 +++++++------
 .../management/QueueControlUsingCoreTest.java   |  55 +-
 .../integration/paging/PagingCounterTest.java   |  21 +-
 .../AbstractPersistentStatTestSupport.java      | 213 ++++++
 .../metrics/JournalPageCountSizeTest.java       | 144 ++++
 .../metrics/JournalPendingMessageTest.java      | 651 +++++++++++++++++++
 .../unit/core/postoffice/impl/FakeQueue.java    |  47 +-
 .../core/server/impl/fakes/FakeConsumer.java    |   4 +-
 57 files changed, 2553 insertions(+), 392 deletions(-)
----------------------------------------------------------------------