[1/2] activemq-artemis git commit: This closes #1827

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/2] activemq-artemis git commit: This closes #1827

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 33b265ca6 -> 2eac1959d


This closes #1827


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2eac1959
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2eac1959
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2eac1959

Branch: refs/heads/master
Commit: 2eac1959df62335c50fb70cd38800f905eb05af8
Parents: 33b265c 822445a
Author: Clebert Suconic <[hidden email]>
Authored: Thu Feb 8 09:12:57 2018 -0500
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Feb 8 09:12:57 2018 -0500

----------------------------------------------------------------------
 .../core/paging/cursor/PagedReference.java      |  4 ++++
 .../core/paging/cursor/PagedReferenceImpl.java  | 24 ++++++++++++++++++++
 .../cursor/impl/PageSubscriptionImpl.java       |  4 ++--
 .../artemis/core/server/MessageReference.java   |  2 ++
 .../core/server/impl/LastValueQueue.java        |  5 ++++
 .../core/server/impl/MessageReferenceImpl.java  |  5 ++++
 .../artemis/core/server/impl/RefsOperation.java |  5 +++-
 .../core/server/impl/ServerConsumerImpl.java    |  2 +-
 8 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


Reply | Threaded
Open this post in threaded view
|

[2/2] activemq-artemis git commit: ARTEMIS-1650 Improve paged message acknowledge

clebertsuconic-2
ARTEMIS-1650 Improve paged message acknowledge

Cache `messageID`, `transactionID` and `isLargeMessage`
in PagedReference, so that when acknowledge, we do not have to
get PagedMessage which may be GCed and cause re-read entire page.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/822445a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/822445a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/822445a7

Branch: refs/heads/master
Commit: 822445a717f943c64a84b2ac6a0af8ace9e5cd23
Parents: 33b265c
Author: huaishk <[hidden email]>
Authored: Wed Jan 31 09:48:43 2018 +0800
Committer: Clebert Suconic <[hidden email]>
Committed: Thu Feb 8 09:12:57 2018 -0500

----------------------------------------------------------------------
 .../core/paging/cursor/PagedReference.java      |  4 ++++
 .../core/paging/cursor/PagedReferenceImpl.java  | 24 ++++++++++++++++++++
 .../cursor/impl/PageSubscriptionImpl.java       |  4 ++--
 .../artemis/core/server/MessageReference.java   |  2 ++
 .../core/server/impl/LastValueQueue.java        |  5 ++++
 .../core/server/impl/MessageReferenceImpl.java  |  5 ++++
 .../artemis/core/server/impl/RefsOperation.java |  5 +++-
 .../core/server/impl/ServerConsumerImpl.java    |  2 +-
 8 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
index c1ff089..be2d042 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
@@ -24,4 +24,8 @@ public interface PagedReference extends MessageReference {
    PagePosition getPosition();
 
    PagedMessage getPagedMessage();
+
+   boolean isLargeMessage();
+
+   long getTransactionID();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 7189007..42c5423 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -53,6 +53,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    private Object protocolData;
 
+   private final boolean largeMessage;
+
+   private final long transactionID;
+
+   private final long messageID;
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -95,6 +101,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
       this.position = position;
       this.message = new WeakReference<>(message);
       this.subscription = subscription;
+      this.largeMessage = message.getMessage().isLargeMessage();
+      this.transactionID = message.getTransactionID();
+      this.messageID = message.getMessage().getMessageID();
    }
 
    @Override
@@ -256,4 +265,19 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
       return this.consumerId;
    }
 
+   @Override
+   public boolean isLargeMessage() {
+      return largeMessage;
+   }
+
+   @Override
+   public long getTransactionID() {
+      return transactionID;
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageID;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index a674935..24c69be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -849,8 +849,8 @@ final class PageSubscriptionImpl implements PageSubscription {
    }
 
    private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException {
-      if (reference.getPagedMessage().getTransactionID() >= 0) {
-         return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
+      if (reference.getTransactionID() >= 0) {
+         return pageStore.getPagingManager().getTransaction(reference.getTransactionID());
       } else {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 799b0b0..906ea7e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -38,6 +38,8 @@ public interface MessageReference {
 
    Message getMessage();
 
+   long getMessageID();
+
    /**
     * We define this method aggregation here because on paging we need to hold the original estimate,
     * so we need to perform some extra steps on paging.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 7aada5e..90b8814 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -238,6 +238,11 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
+      public long getMessageID() {
+         return getMessage().getMessageID();
+      }
+
+      @Override
       public Queue getQueue() {
          return ref.getQueue();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 1b434bc..7543ba5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -147,6 +147,11 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    }
 
    @Override
+   public long getMessageID() {
+      return getMessage().getMessageID();
+   }
+
+   @Override
    public Queue getQueue() {
       return queue;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 6bf69ed..e492985 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -159,7 +160,9 @@ public class RefsOperation extends TransactionOperationAbstract {
 
       if (pagedMessagesToPostACK != null) {
          for (MessageReference refmsg : pagedMessagesToPostACK) {
-            decrementRefCount(refmsg);
+            if (((PagedReference) refmsg).isLargeMessage()) {
+               decrementRefCount(refmsg);
+            }
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 45dd05c..95d613e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -867,7 +867,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
             acks++;
          }
-         while (ref.getMessage().getMessageID() != messageID);
+         while (ref.getMessageID() != messageID);
 
          if (startedTransaction) {
             tx.commit();