[activemq-artemis] branch master updated: ARTEMIS-2859 - track owning page store as in a message reference to ensure correct usage tracking, only track size on the owning store, reference everywhere else via refUp

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated: ARTEMIS-2859 - track owning page store as in a message reference to ensure correct usage tracking, only track size on the owning store, reference everywhere else via refUp

clebertsuconic-2
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e8ce9e  ARTEMIS-2859 - track owning page store as in a message reference to ensure correct usage tracking, only track size on the owning store, reference everywhere else via refUp
     new 4395a95  This closes #3282
4e8ce9e is described below

commit 4e8ce9ed1001f74dec600e3aff85ebe605d66a2a
Author: gtully <[hidden email]>
AuthorDate: Wed Oct 14 16:54:39 2020 +0100

    ARTEMIS-2859 - track owning page store as in a message reference to ensure correct usage tracking, only track size on the owning store, reference everywhere else via refUp
---
 .../core/paging/cursor/PagedReferenceImpl.java     |  11 +++
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  16 +---
 .../core/postoffice/impl/PostOfficeImpl.java       |  24 +++--
 .../artemis/core/server/MessageReference.java      |   9 +-
 .../apache/activemq/artemis/core/server/Queue.java |   4 +-
 .../server/impl/GroupFirstMessageReference.java    |  12 +++
 .../artemis/core/server/impl/LastValueQueue.java   |  10 ++
 .../core/server/impl/MessageReferenceImpl.java     |  18 +++-
 .../artemis/core/server/impl/QueueImpl.java        |  39 ++++----
 .../artemis/core/server/impl/RefsOperation.java    |   4 +-
 .../core/server/impl/ServerConsumerImpl.java       |   2 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   9 +-
 .../artemis/tests/util/ActiveMQTestBase.java       |   2 +-
 .../jms/cluster/TopicClusterPageStoreSizeTest.java | 105 +++++++++++++++++++++
 .../tests/integration/paging/GlobalPagingTest.java |   2 +-
 .../core/server/impl/QueueConcurrentTest.java      |   2 +-
 .../impl/fakes/FakeSequentialFileFactory.java      |   3 +-
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   4 +-
 18 files changed, 214 insertions(+), 62 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 76f5a05..27e6167 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -406,6 +407,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
    }
 
    @Override
+   public PagingStore getOwner() {
+      return null;
+   }
+
+   @Override
+   public void setOwner(PagingStore owner) {
+
+   }
+
+   @Override
    public boolean isDurable() {
       if (durable == UNDEFINED_IS_DURABLE) {
          durable = getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 6fb4797..31c969d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -966,11 +966,7 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public void refUp(Message message, int count) {
-      if (count == 1) {
-         this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
-      } else {
-         this.addSize(MessageReferenceImpl.getMemoryEstimate());
-      }
+      this.addSize(MessageReferenceImpl.getMemoryEstimate());
    }
 
    @Override
@@ -979,15 +975,7 @@ public class PagingStoreImpl implements PagingStore {
          // this could happen on paged messages since they are not routed and refUp is never called
          return;
       }
-
-      if (count == 0) {
-         this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
-
-      } else {
-         this.addSize(-MessageReferenceImpl.getMemoryEstimate());
-      }
-
-
+      this.addSize(-MessageReferenceImpl.getMemoryEstimate());
    }
 
    private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index a3576b3..a904baf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1201,7 +1201,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    @Override
    public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
 
-      MessageReference reference = MessageReference.Factory.createReference(message, queue);
+      MessageReference reference = MessageReference.Factory.createReference(message, queue, pagingManager.getPageStore(message.getAddressSimpleString()));
 
       Long scheduledDeliveryTime;
       if (message.hasScheduledDeliveryTime()) {
@@ -1211,6 +1211,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          }
       }
 
+      queue.refUp(reference);
       queue.durableUp(message);
 
       if (tx == null) {
@@ -1455,8 +1456,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          deliveryTime = message.getScheduledDeliveryTime();
       }
 
+      PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
       for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
-         PagingStore store = pagingManager.getPageStore(entry.getKey());
+         PagingStore store;
+         if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
+            store = owningStore;
+         } else {
+            store = pagingManager.getPageStore(entry.getKey());
+         }
 
          if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
             if (message.isLargeMessage()) {
@@ -1469,14 +1476,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          }
 
          for (Queue queue : entry.getValue().getNonDurableQueues()) {
-            MessageReference reference = MessageReference.Factory.createReference(message, queue);
+            MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
 
             if (deliveryTime != null) {
                reference.setScheduledDeliveryTime(deliveryTime);
             }
             refs.add(reference);
 
-            queue.refUp(message);
+            queue.refUp(reference);
          }
 
          Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
@@ -1484,7 +1491,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          while (iter.hasNext()) {
             Queue queue = iter.next();
 
-            MessageReference reference = MessageReference.Factory.createReference(message, queue);
+            MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
 
             if (context.isAlreadyAcked(context.getAddress(message), queue)) {
                reference.setAlreadyAcked();
@@ -1497,6 +1504,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                reference.setScheduledDeliveryTime(deliveryTime);
             }
             refs.add(reference);
+            queue.refUp(reference);
 
             if (message.isDurable()) {
                int durableRefCount = queue.durableUp(message);
@@ -1528,8 +1536,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                      storageManager.updateScheduledDeliveryTime(reference);
                   }
                }
-            } else {
-               queue.refUp(message);
             }
          }
       }
@@ -1852,12 +1858,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          // Reverse the ref counts, and paging sizes
 
          for (MessageReference ref : refs) {
+            ref.getQueue().refDown(ref);
             Message message = ref.getMessage();
-
             if (message.isDurable() && ref.getQueue().isDurable()) {
                ref.getQueue().durableDown(message);
-            } else {
-               ref.getQueue().refDown(message);
             }
          }
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index c55910b..6765e4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -22,6 +22,7 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -34,8 +35,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 public interface MessageReference {
 
    final class Factory {
-      public static MessageReference createReference(Message encode, final Queue queue) {
-         return new MessageReferenceImpl(encode, queue);
+      public static MessageReference createReference(Message encode, final Queue queue, PagingStore pageStore) {
+         return new MessageReferenceImpl(encode, queue, pageStore);
       }
    }
    boolean isPaged();
@@ -136,4 +137,8 @@ public interface MessageReference {
     * @throws ActiveMQException
     */
    long getPersistentSize() throws ActiveMQException;
+
+   PagingStore getOwner();
+
+   void setOwner(PagingStore owner);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 0f2a071..9ce31d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -68,9 +68,9 @@ public interface Queue extends Bindable,CriticalComponent {
 
    int durableDown(Message message);
 
-   void refUp(Message message);
+   void refUp(MessageReference messageReference);
 
-   void refDown(Message message);
+   void refDown(MessageReference messageReference);
 
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
index d4db19b..6e2f1fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -20,6 +20,7 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -37,6 +38,7 @@ public class GroupFirstMessageReference implements MessageReference {
    private final MessageReference messageReference;
    private final SimpleString key;
    private volatile Message message;
+   private volatile PagingStore owner;
 
    public GroupFirstMessageReference(SimpleString key, MessageReference messageReference) {
       this.messageReference = messageReference;
@@ -215,4 +217,14 @@ public class GroupFirstMessageReference implements MessageReference {
    public long getPersistentSize() throws ActiveMQException {
       return messageReference.getPersistentSize();
    }
+
+   @Override
+   public PagingStore getOwner() {
+      return this.owner;
+   }
+
+   @Override
+   public void setOwner(PagingStore owner) {
+      this.owner = owner;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 9f3c82b..3cb3d09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -538,6 +538,16 @@ public class LastValueQueue extends QueueImpl {
       public long getPersistentSize() throws ActiveMQException {
          return ref.getPersistentSize();
       }
+
+      @Override
+      public PagingStore getOwner() {
+         return ref.getOwner();
+      }
+
+      @Override
+      public void setOwner(PagingStore owner) {
+         ref.setOwner(owner);
+      }
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index dea1478..09b3650 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -35,6 +36,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
 
    private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
+   private volatile PagingStore owner;
 
    public static Comparator<MessageReference> getIDComparator() {
       return idComparator;
@@ -102,12 +104,16 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
       message = other.message;
 
       this.queue = queue;
+
+      this.owner = other.owner;
    }
 
-   public MessageReferenceImpl(final Message message, final Queue queue) {
+   public MessageReferenceImpl(final Message message, final Queue queue, final PagingStore owner) {
       this.message = message;
 
       this.queue = queue;
+
+      this.owner = owner;
    }
 
    // MessageReference implementation -------------------------------
@@ -348,4 +354,14 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    public long getPersistentSize() throws ActiveMQException {
       return this.getMessage().getPersistentSize();
    }
+
+   @Override
+   public PagingStore getOwner() {
+      return this.owner;
+   }
+
+   @Override
+   public void setOwner(PagingStore owner) {
+      this.owner = owner;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 0013506..35f66f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -970,36 +970,37 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public int durableUp(Message message) {
-      int count = message.durableUp();
-      if (pagingStore != null) {
-         pagingStore.durableUp(message, count);
-      }
-      return count;
+      return message.durableUp();
    }
 
    @Override
    public int durableDown(Message message) {
-      int count = message.durableDown();
-      if (pagingStore != null) {
-         pagingStore.durableDown(message, count);
-      }
-      return count;
+      return message.durableDown();
    }
 
    @Override
-   public void refUp(Message message) {
-      int count = message.refUp();
+   public void refUp(MessageReference messageReference) {
+      int count = messageReference.getMessage().refUp();
+      if (count == 1) {
+         if (messageReference.getOwner() != null) {
+            messageReference.getOwner().addSize(messageReference.getMessageMemoryEstimate());
+         }
+      }
       if (pagingStore != null) {
-         pagingStore.refUp(message, count);
+         pagingStore.refUp(messageReference.getMessage(), count);
       }
-
    }
 
    @Override
-   public void refDown(Message message) {
-      int count = message.refDown();
+   public void refDown(MessageReference messageReference) {
+      int count = messageReference.getMessage().refDown();
+      if (count == 0) {
+         if (messageReference.getOwner() != null) {
+            messageReference.getOwner().addSize(-messageReference.getMessageMemoryEstimate());
+         }
+      }
       if (pagingStore != null) {
-         pagingStore.refDown(message, count);
+         pagingStore.refDown(messageReference.getMessage(), count);
       }
    }
 
@@ -3826,6 +3827,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (message == null || (nonDestructive && reason == AckReason.NORMAL))
          return;
 
+      queue.refDown(ref);
+
       boolean durableRef = message.isDurable() && queue.isDurable();
 
       if (durableRef) {
@@ -3854,8 +3857,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
             }
          }
-      } else {
-         queue.refDown(message);
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 552c190..054ba73 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -144,7 +144,7 @@ public class RefsOperation extends TransactionOperationAbstract {
                   ackedTX.setContainsPersistent();
                }
 
-               ref.getQueue().refUp(message);
+               ref.getQueue().refUp(ref);
             }
             ackedTX.commit(true);
          } catch (Exception e) {
@@ -188,7 +188,7 @@ public class RefsOperation extends TransactionOperationAbstract {
          for (MessageReference refmsg : pagedMessagesToPostACK) {
             ((PagedReference)refmsg).removePendingFlag();
             if (((PagedReference) refmsg).isLargeMessage()) {
-               refmsg.getQueue().refDown(refmsg.getMessage());
+               refmsg.getQueue().refDown(refmsg);
             }
          }
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 7f71970..35a90e8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -646,7 +646,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    public void forceDelivery(final long sequence)  {
       forceDelivery(sequence, () -> {
          Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
-         MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
+         MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue, null);
          reference.setDeliveryCount(0);
 
          forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 00b205e..8996287 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -250,7 +250,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
                            long nextMessageID,
                            long nextScheduledTime,
                            boolean tail) {
-      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
+      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null, null);
       refImpl.setScheduledDeliveryTime(nextScheduledTime);
       handler.addInPlace(nextScheduledTime, refImpl, tail);
    }
@@ -260,7 +260,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
                                  long nextScheduledTime,
                                  boolean tail,
                                  Queue queue) {
-      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
+      MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue, null);
       refImpl.setScheduledDeliveryTime(nextScheduledTime);
       handler.checkAndSchedule(refImpl, tail);
    }
@@ -808,6 +808,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public long getPersistentSize() throws ActiveMQException {
          return 0;
       }
+
    }
 
    public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
@@ -843,12 +844,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void refUp(Message message) {
+      public void refUp(MessageReference messageReference) {
 
       }
 
       @Override
-      public void refDown(Message message) {
+      public void refDown(MessageReference messageReference) {
 
       }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index d1adf59..b538f8a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -2138,7 +2138,7 @@ public abstract class ActiveMQTestBase extends Assert {
    protected MessageReference generateReference(final Queue queue, final long id) {
       Message message = generateMessage(id);
 
-      return MessageReference.Factory.createReference(message, queue);
+      return MessageReference.Factory.createReference(message, queue, null);
    }
 
    protected int calculateRecordSize(final int size, final int alignment) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
new file mode 100644
index 0000000..b8bf8b8
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.cluster;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
+import org.junit.Test;
+
+public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
+
+   public static final String TOPIC = "jms.t1";
+
+   @Test
+   public void testPageStoreSizeWithClusteredDurableSub() throws Exception {
+      doTestPageStoreSizeWithClusteredDurableSub(false);
+   }
+
+   @Test
+   public void testPageStoreSizeWithClusteredDurableSubWithPaging() throws Exception {
+      doTestPageStoreSizeWithClusteredDurableSub(true);
+   }
+
+   private void doTestPageStoreSizeWithClusteredDurableSub(boolean forcePaging) throws Exception {
+
+      Connection conn1 = cf1.createConnection();
+
+      conn1.setClientID("someClient1");
+
+      Connection conn2 = cf2.createConnection();
+
+      conn2.setClientID("someClient2");
+
+      conn1.start();
+
+      conn2.start();
+
+      Topic topic1 = createTopic(TOPIC, true);
+
+      Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod1 = session1.createProducer(null);
+      prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      MessageConsumer cons1 = session1.createDurableSubscriber(topic1, "sub1");
+      MessageConsumer cons2 = session2.createDurableSubscriber(topic1, "sub2");
+
+      waitForBindings(server1, TOPIC, true, 1, 1, 2000);
+      waitForBindings(server2, TOPIC, true, 1, 1, 2000);
+      waitForBindings(server1, TOPIC, false, 1, 1, 2000);
+      waitForBindings(server2, TOPIC, false, 1, 1, 2000);
+
+      if (forcePaging) {
+         for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
+            server1.getPagingManager().getPageStore(psName).startPaging();
+         }
+         for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+            server2.getPagingManager().getPageStore(psName).startPaging();
+         }
+      }
+
+      prod1.send(topic1, session1.createTextMessage("someMessage"));
+
+      TextMessage m2 = (TextMessage) cons2.receive(5000);
+      assertNotNull(m2);
+      TextMessage m1 = (TextMessage) cons1.receive(5000);
+      assertTrue(m1.getJMSDestination().toString().contains(TOPIC));
+
+      assertNotNull(m1);
+
+      conn1.close();
+      conn2.close();
+
+      for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
+         assertTrue("non negative size: " + psName, server1.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
+      }
+
+      for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+         assertTrue("non negative size: " + psName, server2.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
+      }
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
index 3dee2b2..97c941e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -334,7 +334,7 @@ public class GlobalPagingTest extends PagingTest {
                int id = 1000;
                try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
                   final Queue queue = server.locateQueue(replyQueue);
-                  final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue);
+                  final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue, null);
                   reference.getMessage().setMessageID(id++);
                   //it will cause QueueImpl::directDeliver -> false
                   queue.addHead(reference, false);
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
index 6d73cfd..3cf0cae 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
@@ -140,7 +140,7 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
          while (System.currentTimeMillis() - start < testTime) {
             Message message = generateMessage(i);
 
-            MessageReference ref = MessageReference.Factory.createReference(message, queue);
+            MessageReference ref = MessageReference.Factory.createReference(message, queue, null);
 
             queue.addTail(ref, false);
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index 9091c81..0c5d89e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -312,11 +312,10 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
 
       @Override
       public void delete() {
+         fileMap.remove(fileName);
          if (open) {
             close();
          }
-
-         fileMap.remove(fileName);
       }
 
       @Override
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 1ab01c1..80a9252 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -76,12 +76,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void refUp(Message message) {
+   public void refUp(MessageReference messageReference) {
 
    }
 
    @Override
-   public void refDown(Message message) {
+   public void refDown(MessageReference messageReference) {
 
    }