activemq git commit: [AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

activemq git commit: [AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached

cshannon
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x 7f75b4b9a -> d8f8ae9f9


[AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached

(cherry picked from commit ec6fa190999160676cab900038b268b2d40a4d5c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d8f8ae9f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d8f8ae9f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d8f8ae9f

Branch: refs/heads/activemq-5.15.x
Commit: d8f8ae9f925b905968c51b386049c5b2d9b6b2d0
Parents: 7f75b4b
Author: gtully <[hidden email]>
Authored: Thu Jan 11 12:56:40 2018 +0000
Committer: Christopher L. Shannon (cshannon) <[hidden email]>
Committed: Thu Jan 11 08:07:58 2018 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  4 +-
 .../store/kahadb/disk/journal/Location.java     | 13 +--
 .../DataFileAppenderNoSpaceNoBatchTest.java     |  2 +-
 .../org/apache/activemq/bugs/AMQ6815Test.java   | 95 ++++++++++++++++++++
 4 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d8f8ae9f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index b391de7..94de6ea 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2132,8 +2132,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
             try {
                 location.getLatch().await();
-                if (location.getBatch().exception.get() != null) {
-                    throw location.getBatch().exception.get();
+                if (location.getException().get() != null) {
+                    throw location.getException().get();
                 }
             } catch (InterruptedException e) {
                 throw new InterruptedIOException(e.toString());

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8f8ae9f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
index f3da47a..673d9f6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Used as a location in the data store.
@@ -36,7 +37,8 @@ public final class Location implements Comparable<Location> {
     private int offset = NOT_SET;
     private int size = NOT_SET;
     private byte type = NOT_SET_TYPE;
-    private DataFileAppender.WriteBatch batch;
+    private CountDownLatch latch;
+    private AtomicReference<IOException> exception;
 
     public Location() {
     }
@@ -114,11 +116,12 @@ public final class Location implements Comparable<Location> {
     }
 
     public CountDownLatch getLatch() {
-        return batch.latch;
+        return latch;
     }
 
     public void setBatch(DataFileAppender.WriteBatch batch) {
-        this.batch = batch;
+        this.latch = batch.latch;
+        this.exception = batch.exception;
     }
 
     public int compareTo(Location o) {
@@ -142,7 +145,7 @@ public final class Location implements Comparable<Location> {
         return dataFileId ^ offset;
     }
 
-    public DataFileAppender.WriteBatch getBatch() {
-        return batch;
+    public AtomicReference<IOException> getException() {
+        return exception;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8f8ae9f/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
index a6b19ee..6d778c3 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
@@ -183,7 +183,7 @@ public class DataFileAppenderNoSpaceNoBatchTest {
 
         boolean someExceptions = false;
         for (Location location: locations) {
-            someExceptions |= (location.getBatch().exception != null);
+            someExceptions |= (location.getException().get() != null);
         }
         assertTrue(someExceptions);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d8f8ae9f/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
new file mode 100644
index 0000000..0b41195
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+public class AMQ6815Test {
+   static final Logger LOG = LoggerFactory.getLogger(AMQ6815Test.class);
+   private final static int MEM_LIMIT = 5*1024*1024;
+   private final static byte[] payload = new byte[5*1024];
+
+      protected BrokerService brokerService;
+      protected Connection connection;
+      protected Session session;
+      protected Queue amqDestination;
+
+      @Before
+      public void setUp() throws Exception {
+         brokerService = new BrokerService();
+         PolicyEntry policy = new PolicyEntry();
+         policy.setMemoryLimit(MEM_LIMIT);
+         PolicyMap pMap = new PolicyMap();
+         pMap.setDefaultEntry(policy);
+         brokerService.setDestinationPolicy(pMap);
+
+         brokerService.start();
+         connection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
+         connection.start();
+         session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE);
+         amqDestination = session.createQueue("QQ");
+      }
+
+      @After
+      public void tearDown() throws Exception {
+         if (connection != null) {
+            connection.close();
+         }
+         brokerService.stop();
+      }
+
+      @Test(timeout = 60000)
+      public void testHeapUsage() throws Exception {
+         Runtime.getRuntime().gc();
+         final long initUsedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+         sendMessages(10000);
+         Runtime.getRuntime().gc();
+         long usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory() - initUsedMemory;
+         LOG.info("Mem in use: " + usedMem/1024  + "K");
+         assertTrue("Used Mem reasonable " + usedMem, usedMem < 5*MEM_LIMIT);
+      }
+
+      protected void sendMessages(int count) throws JMSException {
+         MessageProducer producer = session.createProducer(amqDestination);
+         for (int i = 0; i < count; i++) {
+            BytesMessage bytesMessage = session.createBytesMessage();
+            bytesMessage.writeBytes(payload);
+            producer.send(bytesMessage);
+         }
+         producer.close();
+      }
+
+}