activemq git commit: [AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

activemq git commit: [AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached

gtully-2
Repository: activemq
Updated Branches:
  refs/heads/master 4535e8f09 -> ec6fa1909


[AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ec6fa190
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ec6fa190
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ec6fa190

Branch: refs/heads/master
Commit: ec6fa190999160676cab900038b268b2d40a4d5c
Parents: 4535e8f
Author: gtully <[hidden email]>
Authored: Thu Jan 11 12:56:40 2018 +0000
Committer: gtully <[hidden email]>
Committed: Thu Jan 11 12:56:40 2018 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  4 +-
 .../store/kahadb/disk/journal/Location.java     | 13 +--
 .../DataFileAppenderNoSpaceNoBatchTest.java     |  2 +-
 .../org/apache/activemq/bugs/AMQ6815Test.java   | 95 ++++++++++++++++++++
 4 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index b391de7..94de6ea 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2132,8 +2132,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
             try {
                 location.getLatch().await();
-                if (location.getBatch().exception.get() != null) {
-                    throw location.getBatch().exception.get();
+                if (location.getException().get() != null) {
+                    throw location.getException().get();
                 }
             } catch (InterruptedException e) {
                 throw new InterruptedIOException(e.toString());

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
index f3da47a..673d9f6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Used as a location in the data store.
@@ -36,7 +37,8 @@ public final class Location implements Comparable<Location> {
     private int offset = NOT_SET;
     private int size = NOT_SET;
     private byte type = NOT_SET_TYPE;
-    private DataFileAppender.WriteBatch batch;
+    private CountDownLatch latch;
+    private AtomicReference<IOException> exception;
 
     public Location() {
     }
@@ -114,11 +116,12 @@ public final class Location implements Comparable<Location> {
     }
 
     public CountDownLatch getLatch() {
-        return batch.latch;
+        return latch;
     }
 
     public void setBatch(DataFileAppender.WriteBatch batch) {
-        this.batch = batch;
+        this.latch = batch.latch;
+        this.exception = batch.exception;
     }
 
     public int compareTo(Location o) {
@@ -142,7 +145,7 @@ public final class Location implements Comparable<Location> {
         return dataFileId ^ offset;
     }
 
-    public DataFileAppender.WriteBatch getBatch() {
-        return batch;
+    public AtomicReference<IOException> getException() {
+        return exception;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
index a6b19ee..6d778c3 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
@@ -183,7 +183,7 @@ public class DataFileAppenderNoSpaceNoBatchTest {
 
         boolean someExceptions = false;
         for (Location location: locations) {
-            someExceptions |= (location.getBatch().exception != null);
+            someExceptions |= (location.getException().get() != null);
         }
         assertTrue(someExceptions);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec6fa190/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
new file mode 100644
index 0000000..0b41195
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+public class AMQ6815Test {
+   static final Logger LOG = LoggerFactory.getLogger(AMQ6815Test.class);
+   private final static int MEM_LIMIT = 5*1024*1024;
+   private final static byte[] payload = new byte[5*1024];
+
+      protected BrokerService brokerService;
+      protected Connection connection;
+      protected Session session;
+      protected Queue amqDestination;
+
+      @Before
+      public void setUp() throws Exception {
+         brokerService = new BrokerService();
+         PolicyEntry policy = new PolicyEntry();
+         policy.setMemoryLimit(MEM_LIMIT);
+         PolicyMap pMap = new PolicyMap();
+         pMap.setDefaultEntry(policy);
+         brokerService.setDestinationPolicy(pMap);
+
+         brokerService.start();
+         connection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
+         connection.start();
+         session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE);
+         amqDestination = session.createQueue("QQ");
+      }
+
+      @After
+      public void tearDown() throws Exception {
+         if (connection != null) {
+            connection.close();
+         }
+         brokerService.stop();
+      }
+
+      @Test(timeout = 60000)
+      public void testHeapUsage() throws Exception {
+         Runtime.getRuntime().gc();
+         final long initUsedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+         sendMessages(10000);
+         Runtime.getRuntime().gc();
+         long usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory() - initUsedMemory;
+         LOG.info("Mem in use: " + usedMem/1024  + "K");
+         assertTrue("Used Mem reasonable " + usedMem, usedMem < 5*MEM_LIMIT);
+      }
+
+      protected void sendMessages(int count) throws JMSException {
+         MessageProducer producer = session.createProducer(amqDestination);
+         for (int i = 0; i < count; i++) {
+            BytesMessage bytesMessage = session.createBytesMessage();
+            bytesMessage.writeBytes(payload);
+            producer.send(bytesMessage);
+         }
+         producer.close();
+      }
+
+}