[1/6] activemq-artemis git commit: ARTEMIS-1119 flow controlling connection

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/6] activemq-artemis git commit: ARTEMIS-1119 flow controlling connection

jbertram
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7d5511cfb -> 1f82c783a


ARTEMIS-1119 flow controlling connection

https://issues.apache.org/jira/browse/ARTEMIS-1119


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/807e4e5d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/807e4e5d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/807e4e5d

Branch: refs/heads/master
Commit: 807e4e5d9cef75985af09286f13664282ee0a74c
Parents: 0a0955d
Author: Clebert Suconic <[hidden email]>
Authored: Mon Apr 17 15:02:33 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Apr 18 11:34:09 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPConnectionCallback.java     |  6 ++++++
 .../amqp/proton/AMQPConnectionContext.java      |  8 +++++++-
 .../amqp/proton/handler/EventHandler.java       |  3 +++
 .../amqp/proton/handler/ProtonHandler.java      | 20 +++++++++++++++++++-
 4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 29a4df3..31bec9a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -156,6 +157,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
       connection.write(new ChannelBufferWrapper(byteBuf, true));
    }
 
+   public boolean isWritable(ReadyListener readyListener) {
+      return connection.isWritable(readyListener);
+   }
+
+
    public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
       return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 0173631..4a46a8a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -90,7 +91,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
 
       this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
-      this.handler = new ProtonHandler();
+      this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor());
       handler.addEventHandler(this);
       Transport transport = handler.getTransport();
       transport.setEmitFlowEventOnSend(false);
@@ -333,6 +334,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
    }
 
    @Override
+   public boolean flowControl(ReadyListener readyListener) {
+      return connectionCallback.isWritable(readyListener);
+   }
+
+   @Override
    public void onRemoteOpen(Connection connection) throws Exception {
       lock();
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
index 0ed1723..c8ba136 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.protocol.amqp.proton.handler;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
@@ -78,4 +79,6 @@ public interface EventHandler {
 
    void pushBytes(ByteBuf bytes);
 
+   boolean flowControl(ReadyListener readyListener);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/807e4e5d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index f1be934..e3cb730 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -29,6 +30,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -71,14 +73,23 @@ public class ProtonHandler extends ProtonInitializable {
 
    protected boolean receivedFirstPacket = false;
 
+   private final Executor flushExecutor;
+
+   protected final ReadyListener readyListener;
+
    boolean inDispatch = false;
 
-   public ProtonHandler() {
+   public ProtonHandler(Executor flushExecutor) {
+      this.flushExecutor = flushExecutor;
+      this.readyListener = () -> flushExecutor.execute(() -> {
+         flush();
+      });
       this.creationTime = System.currentTimeMillis();
       transport.bind(connection);
       connection.collect(collector);
    }
 
+
    public long tick(boolean firstTick) {
       lock.lock();
       try {
@@ -161,6 +172,13 @@ public class ProtonHandler extends ProtonInitializable {
    }
 
    public void flushBytes() {
+
+      for (EventHandler handler : handlers) {
+         if (!handler.flowControl(readyListener)) {
+            return;
+         }
+      }
+
       lock.lock();
       try {
          while (true) {

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[3/6] activemq-artemis git commit: ARTEMIS-1117 Improving IO Resilience Part II

jbertram
ARTEMIS-1117 Improving IO Resilience Part II

https://issues.apache.org/jira/browse/ARTEMIS-1117


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0a0955d0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0a0955d0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0a0955d0

Branch: refs/heads/master
Commit: 0a0955d0ccfd948bf9aa8c4c13a46ac92d4e9463
Parents: 23ba3e2
Author: Clebert Suconic <[hidden email]>
Authored: Thu Apr 13 09:04:34 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Apr 18 11:34:09 2017 -0400

----------------------------------------------------------------------
 .../journal/impl/JournalFilesRepository.java    | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0a0955d0/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
index 8440d93..c0a278d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
@@ -90,6 +90,7 @@ public class JournalFilesRepository {
             pushOpenedFile();
          } catch (Exception e) {
             ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
+            fileFactory.onIOError(e, "unable to open ", null);
          }
       }
    };
@@ -412,21 +413,35 @@ public class JournalFilesRepository {
          logger.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
       }
 
-      if (openFilesExecutor == null) {
-         pushOpenRunnable.run();
-      } else {
-         openFilesExecutor.execute(pushOpenRunnable);
-      }
+      // First try to get an open file, that's prepared and already open
+      JournalFile nextFile = openedFiles.poll();
 
-      JournalFile nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
       if (nextFile == null) {
-         fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null);
-         // We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
-         // If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver
-         fileFactory.activateBuffer(journal.getCurrentFile().getFile());
-         throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
+         // if there's none, push to open
+
+         pushOpen();
+
+         nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+      }
+
+      if (openedFiles.isEmpty()) {
+         // if empty, push to open one.
+         pushOpen();
       }
 
+      if (nextFile == null) {
+
+         logger.debug("Could not get a file in 5 seconds, it will retry directly, without an executor");
+         try {
+            nextFile = takeFile(true, true, true, false);
+         } catch (Exception e) {
+            fileFactory.onIOError(e, "unable to open ", null);
+            // We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
+            // If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver
+            fileFactory.activateBuffer(journal.getCurrentFile().getFile());
+            throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
+         }
+      }
       if (logger.isTraceEnabled()) {
          logger.trace("Returning file " + nextFile);
       }
@@ -434,6 +449,14 @@ public class JournalFilesRepository {
       return nextFile;
    }
 
+   private void pushOpen() {
+      if (openFilesExecutor == null) {
+         pushOpenRunnable.run();
+      } else {
+         openFilesExecutor.execute(pushOpenRunnable);
+      }
+   }
+
    /**
     * Open a file and place it into the openedFiles queue
     */

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/6] activemq-artemis git commit: ARTEMIS-1117 Improving IO Failure resilience Part I

jbertram
In reply to this post by jbertram
ARTEMIS-1117 Improving IO Failure resilience Part I

Me (Clebert) and Francesco worked independently here.
I am keeping Francesco's changes on a separate commit

https://issues.apache.org/jira/browse/ARTEMIS-1117


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/23ba3e27
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/23ba3e27
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/23ba3e27

Branch: refs/heads/master
Commit: 23ba3e27d90b85240f7181b5d792848727fdf172
Parents: 7d5511c
Author: Francesco Nigro <[hidden email]>
Authored: Thu Apr 13 17:48:00 2017 +0200
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Apr 18 11:34:09 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/io/aio/AIOSequentialFile.java  | 38 +++++++++++++++-----
 1 file changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/23ba3e27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index f641aec..9d3a824 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.io.aio;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
 import java.nio.ByteBuffer;
 import java.util.PriorityQueue;
 import java.util.concurrent.Executor;
@@ -100,16 +102,34 @@ public class AIOSequentialFile extends AbstractSequentialFile {
 
       super.close();
 
-      if (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
-         factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
-      }
-
-      opened = false;
-
-      timedBuffer = null;
+      final String fileName = this.getFileName();
+      try {
+         int waitCount = 0;
+         while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
+            waitCount++;
+            if (waitCount == 1) {
+               final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
+               for (ThreadInfo threadInfo : threads) {
+                  ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
+               }
+               factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
+            }
+            ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
+         }
+      } catch (InterruptedException e) {
+         ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
+         throw e;
+      } finally {
+
+         opened = false;
+
+         timedBuffer = null;
+
+         aioFile.close();
+
+         aioFile = null;
 
-      aioFile.close();
-      aioFile = null;
+      }
    }
 
    @Override

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[4/6] activemq-artemis git commit: ARTEMIS-1118 IO callbacks on AMQP

jbertram
In reply to this post by jbertram
ARTEMIS-1118 IO callbacks on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/31d78edd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/31d78edd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/31d78edd

Branch: refs/heads/master
Commit: 31d78eddf1e74b5b846442d0738ccbf1aa42e7d3
Parents: 807e4e5
Author: Clebert Suconic <[hidden email]>
Authored: Mon Apr 17 23:21:43 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Apr 18 11:49:25 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/utils/RunnableEx.java      |  22 +++
 .../amqp/broker/AMQPSessionCallback.java        | 168 +++++++++++--------
 .../client/AMQPClientConnectionFactory.java     |   2 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  18 +-
 .../transaction/ProtonTransactionHandler.java   |  62 ++++---
 5 files changed, 181 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java
new file mode 100644
index 0000000..426cfa2
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+public interface RunnableEx {
+   void run() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 9e54d41..08ea959 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -53,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.RunnableEx;
 import org.apache.activemq.artemis.utils.SelectorTranslator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -78,6 +80,8 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final ProtonProtocolManager manager;
 
+   private final StorageManager storageManager;
+
    private final AMQPConnectionContext connection;
 
    private final Connection transportConnection;
@@ -100,6 +104,7 @@ public class AMQPSessionCallback implements SessionCallback {
                               OperationContext operationContext) {
       this.protonSPI = protonSPI;
       this.manager = manager;
+      this.storageManager = manager.getServer().getStorageManager();
       this.connection = connection;
       this.transportConnection = transportConnection;
       this.closeExecutor = executor;
@@ -134,6 +139,24 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   public void withinContext(RunnableEx run) throws Exception {
+      OperationContext context = recoverContext();
+      try {
+         run.run();
+      } finally {
+         resetContext(context);
+      }
+   }
+
+   public void afterIO(IOCallback ioCallback) {
+      OperationContext context = recoverContext();
+      try {
+         manager.getServer().getStorageManager().afterCompleteOperations(ioCallback);
+      } finally {
+         resetContext(context);
+      }
+   }
+
    @Override
    public void browserFinished(ServerConsumer consumer) {
 
@@ -315,11 +338,11 @@ public class AMQPSessionCallback implements SessionCallback {
    public void close() throws Exception {
       //need to check here as this can be called if init fails
       if (serverSession != null) {
-         recoverContext();
+         OperationContext context = recoverContext();
          try {
             serverSession.close(false);
          } finally {
-            resetContext();
+            resetContext(context);
          }
       }
    }
@@ -328,30 +351,30 @@ public class AMQPSessionCallback implements SessionCallback {
       if (transaction == null) {
          transaction = serverSession.getCurrentTransaction();
       }
-      recoverContext();
+      OperationContext oldContext = recoverContext();
       try {
          ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
       } finally {
-         resetContext();
+         resetContext(oldContext);
       }
    }
 
    public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
-      recoverContext();
+      OperationContext oldContext = recoverContext();
       try {
          ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
          ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
       } finally {
-         resetContext();
+         resetContext(oldContext);
       }
    }
 
    public void reject(Object brokerConsumer, Message message) throws Exception {
-      recoverContext();
+      OperationContext oldContext = recoverContext();
       try {
          ((ServerConsumer) brokerConsumer).reject(message.getMessageID());
       } finally {
-         resetContext();
+         resetContext(oldContext);
       }
    }
 
@@ -380,22 +403,26 @@ public class AMQPSessionCallback implements SessionCallback {
          }
       }
 
-      recoverContext();
+      OperationContext oldcontext = recoverContext();
 
-      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
-      if (store.isRejectingMessages()) {
-         // We drop pre-settled messages (and abort any associated Tx)
-         if (delivery.remotelySettled()) {
-            if (transaction != null) {
-               String amqpAddress = delivery.getLink().getTarget().getAddress();
-               ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
-               transaction.markAsRollbackOnly(e);
+      try {
+         PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
+         if (store.isRejectingMessages()) {
+            // We drop pre-settled messages (and abort any associated Tx)
+            if (delivery.remotelySettled()) {
+               if (transaction != null) {
+                  String amqpAddress = delivery.getLink().getTarget().getAddress();
+                  ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
+                  transaction.markAsRollbackOnly(e);
+               }
+            } else {
+               rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
             }
          } else {
-            rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
+            serverSend(transaction, message, delivery, receiver);
          }
-      } else {
-         serverSend(transaction, message, delivery, receiver);
+      } finally {
+         resetContext(oldcontext);
       }
    }
 
@@ -406,61 +433,67 @@ public class AMQPSessionCallback implements SessionCallback {
       Rejected rejected = new Rejected();
       rejected.setError(condition);
 
-      connection.lock();
-      try {
-         delivery.disposition(rejected);
-         delivery.settle();
-      } finally {
-         connection.unlock();
-      }
-      connection.flush();
+      afterIO(new IOCallback() {
+         @Override
+         public void done() {
+            connection.lock();
+            try {
+               delivery.disposition(rejected);
+               delivery.settle();
+            } finally {
+               connection.unlock();
+            }
+            connection.flush();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+
+         }
+      });
+
    }
 
    private void serverSend(final Transaction transaction,
                            final Message message,
                            final Delivery delivery,
                            final Receiver receiver) throws Exception {
-      try {
-
-         message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
+      message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
 
-         serverSession.send(transaction, message, false, false);
+      serverSession.send(transaction, message, false, false);
 
-         manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
-            @Override
-            public void done() {
-               connection.lock();
-               try {
-                  if (delivery.getRemoteState() instanceof TransactionalState) {
-                     TransactionalState txAccepted = new TransactionalState();
-                     txAccepted.setOutcome(Accepted.getInstance());
-                     txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
-
-                     delivery.disposition(txAccepted);
-                  } else {
-                     delivery.disposition(Accepted.getInstance());
-                  }
-                  delivery.settle();
-               } finally {
-                  connection.unlock();
+      afterIO(new IOCallback() {
+         @Override
+         public void done() {
+            connection.lock();
+            try {
+               if (delivery.getRemoteState() instanceof TransactionalState) {
+                  TransactionalState txAccepted = new TransactionalState();
+                  txAccepted.setOutcome(Accepted.getInstance());
+                  txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
+
+                  delivery.disposition(txAccepted);
+               } else {
+                  delivery.disposition(Accepted.getInstance());
                }
-               connection.flush();
+               delivery.settle();
+            } finally {
+               connection.unlock();
             }
+            connection.flush();
+         }
 
-            @Override
-            public void onError(int errorCode, String errorMessage) {
-               connection.lock();
-               try {
-                  receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
-                  connection.flush();
-               } finally {
-                  connection.unlock();
-               }
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+            connection.lock();
+            try {
+               receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
+               connection.flush();
+            } finally {
+               connection.unlock();
             }
-         });
-      } finally {
-         resetContext();
-      }
+         }
+      });
    }
 
    public void offerProducerCredit(final String address,
@@ -502,12 +535,15 @@ public class AMQPSessionCallback implements SessionCallback {
       manager.getServer().destroyQueue(new SimpleString(queueName));
    }
 
-   private void resetContext() {
-      manager.getServer().getStorageManager().setContext(null);
+   public void resetContext(OperationContext oldContext) {
+      storageManager.setContext(oldContext);
    }
 
-   private void recoverContext() {
+   public OperationContext recoverContext() {
+
+      OperationContext oldContext = storageManager.getContext();
       manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
+      return oldContext;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index 441f3a6..6aa8fda 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ccc93b7..4d8bf53 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.Consumer;
@@ -486,6 +488,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          return;
       }
 
+      OperationContext oldContext = sessionSPI.recoverContext();
+
       try {
          Message message = ((MessageReference) delivery.getContext()).getMessage();
 
@@ -590,7 +594,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             // todo not sure if we need to do anything here
          }
       } finally {
-         connection.flush();
+         sessionSPI.afterIO(new IOCallback() {
+            @Override
+            public void done() {
+               connection.flush();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+               connection.flush();
+            }
+         });
+
+         sessionSPI.resetContext(oldContext);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31d78edd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 4579f1c..bf2e575 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 
 import java.nio.ByteBuffer;
 
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -118,24 +119,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true);
             tx.discharge();
 
-            if (discharge.getFail()) {
-               tx.rollback();
-               connection.lock();
-               try {
-                  delivery.disposition(new Accepted());
-               } finally {
-                  connection.unlock();
+            IOCallback ioAction = new IOCallback() {
+               @Override
+               public void done() {
+                  connection.lock();
+                  try {
+                     delivery.disposition(new Accepted());
+                  } finally {
+                     connection.unlock();
+                  }
                }
-               connection.flush();
-            } else {
-               tx.commit();
-               connection.lock();
-               try {
-                  delivery.disposition(new Accepted());
-               } finally {
-                  connection.unlock();
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+
                }
-               connection.flush();
+            };
+
+            if (discharge.getFail()) {
+               sessionSPI.withinContext(() -> tx.rollback());
+               sessionSPI.afterIO(ioAction);
+            } else {
+               sessionSPI.withinContext(() -> tx.commit());
+               sessionSPI.afterIO(ioAction);
             }
          }
       } catch (ActiveMQAMQPException amqpE) {
@@ -157,13 +163,23 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
          }
          connection.flush();
       } finally {
-         connection.lock();
-         try {
-            delivery.settle();
-         } finally {
-            connection.unlock();
-         }
-         connection.flush();
+         sessionSPI.afterIO(new IOCallback() {
+            @Override
+            public void done() {
+               connection.lock();
+               try {
+                  delivery.settle();
+               } finally {
+                  connection.unlock();
+               }
+               connection.flush();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+
+            }
+         });
       }
    }
 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[5/6] activemq-artemis git commit: ARTEMIS-1121 Improving expiry scanner

jbertram
In reply to this post by jbertram
ARTEMIS-1121 Improving expiry scanner

https://issues.apache.org/jira/browse/ARTEMIS-1121


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1a397724
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1a397724
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1a397724

Branch: refs/heads/master
Commit: 1a397724898259d5451cd441f5f6b301d2c8571b
Parents: 31d78ed
Author: Clebert Suconic <[hidden email]>
Authored: Tue Apr 18 10:29:29 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Apr 18 11:49:25 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       |   3 +-
 .../core/paging/impl/PagingStoreImpl.java       |   2 +-
 .../core/server/ActiveMQServerLogger.java       |   6 +-
 .../artemis/core/server/impl/QueueImpl.java     |  46 ++++-
 tests/smoke-tests/pom.xml                       |  17 ++
 .../main/resources/servers/expire/broker.xml    | 184 +++++++++++++++++++
 .../tests/smoke/expire/TestSimpleExpire.java    |  92 ++++++++++
 7 files changed, 338 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d627fd5..d447ecd 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -633,7 +633,8 @@ public class AMQPMessage extends RefCountMessage {
 
    private synchronized void checkBuffer() {
       if (!bufferValid) {
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
+         int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
          try {
             getProtonMessage().encode(new NettyWritable(buffer));
             byte[] bytes = new byte[buffer.writerIndex()];

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 03f53c6..6486ec9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -683,7 +683,7 @@ public class PagingStoreImpl implements PagingStore {
                   if (pagingManager.isDiskFull()) {
                      ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
                   } else {
-                     ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
+                     ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
                   }
                   blocking.set(true);
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 9aaba7c..48507a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -944,7 +944,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void errorExpiringReferencesNoBindings(SimpleString expiryAddress);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222147, value = "Message has expired. No expiry queue configured for queue {0} so dropping it", format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 222147, value = "Messages are being expired on queue{0}. However there is no expiry queue configured, hence messages will be dropped.", format = Message.Format.MESSAGE_FORMAT)
    void errorExpiringReferencesNoQueue(SimpleString name);
 
    @LogMessage(level = Logger.Level.WARN)
@@ -1104,8 +1104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void missingClusterConfigForScaleDown(String scaleDownCluster);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}", format = Message.Format.MESSAGE_FORMAT)
-   void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize);
+   @Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes on address: {2}, global-max-size is {3}", format = Message.Format.MESSAGE_FORMAT)
+   void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize, long globalMaxSize);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222184,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index dc4b090..a62ae79 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -146,6 +146,8 @@ public class QueueImpl implements Queue {
 
    private final LinkedListIterator<PagedReference> pageIterator;
 
+   private volatile boolean printErrorExpiring = false;
+
    // Messages will first enter intermediateMessageReferences
    // Before they are added to messageReferences
    // This is to avoid locking the queue on the producer
@@ -1567,27 +1569,52 @@ public class QueueImpl implements Queue {
             if (queueDestroyed) {
                return;
             }
+            logger.debug("Scanning for expires on " + QueueImpl.this.getName());
 
             LinkedListIterator<MessageReference> iter = iterator();
 
+            boolean expired = false;
+            boolean hasElements = false;
+
+            int elementsExpired = 0;
             try {
-               boolean expired = false;
-               boolean hasElements = false;
+               Transaction tx = null;
+
                while (postOffice.isStarted() && iter.hasNext()) {
                   hasElements = true;
                   MessageReference ref = iter.next();
                   try {
                      if (ref.getMessage().isExpired()) {
+                        if (tx == null) {
+                           tx = new TransactionImpl(storageManager);
+                        }
                         incDelivering();
                         expired = true;
-                        expire(ref);
+                        expire(tx, ref);
                         iter.remove();
                         refRemoved(ref);
+
+                        if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
+                           logger.debug("Breaking loop of expiring");
+                           scannerRunning.incrementAndGet();
+                           getExecutor().execute(this);
+                           break;
+                        }
                      }
+
                   } catch (Exception e) {
                      ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
                   }
+               }
 
+               logger.debug("Expired " + elementsExpired + " references");
+
+               try {
+                  if (tx != null) {
+                     tx.commit();
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
                }
 
                // If empty we need to schedule depaging to make sure we would depage expired messages as well
@@ -1600,6 +1627,8 @@ public class QueueImpl implements Queue {
                } catch (Throwable ignored) {
                }
                scannerRunning.decrementAndGet();
+               logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
+
             }
          }
       }
@@ -1912,7 +1941,6 @@ public class QueueImpl implements Queue {
       return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
    }
 
-
    private synchronized void internalAddTail(final MessageReference ref) {
       refAdded(ref);
       messageReferences.addTail(ref, getPriority(ref));
@@ -2519,7 +2547,11 @@ public class QueueImpl implements Queue {
             move(expiryAddress, tx, ref, true, true);
          }
       } else {
-         ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
+         if (!printErrorExpiring) {
+            printErrorExpiring = true;
+            // print this only once
+            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
+         }
 
          acknowledge(tx, ref);
       }
@@ -3015,7 +3047,7 @@ public class QueueImpl implements Queue {
             if (messagesIterator != null && messagesIterator.hasNext()) {
                MessageReference msg = messagesIterator.next();
                if (msg.isPaged()) {
-                  previouslyBrowsed.add(((PagedReference)msg).getPosition());
+                  previouslyBrowsed.add(((PagedReference) msg).getPosition());
                }
                return msg;
             } else {
@@ -3156,7 +3188,7 @@ public class QueueImpl implements Queue {
          if (consumersSet.size() == 0) {
             logger.debug("There are no consumers, no need to check slow consumer's rate");
             return;
-         } else if (queueRate  < (threshold * consumersSet.size())) {
+         } else if (queueRate < (threshold * consumersSet.size())) {
             if (logger.isDebugEnabled()) {
                logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 4b06386..4734706 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -172,6 +172,23 @@
                      <configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
                   </configuration>
                </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-expire</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <configuration>${basedir}/target/classes/servers/expire</configuration>
+                     <javaOptions>-Dartemis.debug.paging.interval=1</javaOptions>
+                     <allowAnonymous>true</allowAnonymous>
+                     <user>admin</user>
+                     <password>admin</password>
+                     <instance>${basedir}/target/expire</instance>
+                  </configuration>
+               </execution>
+
             </executions>
             <dependencies>
                <dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
new file mode 100644
index 0000000..a4176f8
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
@@ -0,0 +1,184 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <!--
+        You can specify the NIC you want to use to verify if the network
+         <network-check-NIC>theNickName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in memory
+
+            The system will use half of the available memory (-Xmx) by default for the global-max-size.
+            You may specify a different value here if you need to customize it to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+
+         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
+         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
+
+         <!-- STOMP Acceptor. -->
+         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
+         <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+         <!-- MQTT Acceptor -->
+         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+      </acceptors>
+
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+
+      </addresses>
+
+   </core>
+</configuration>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
new file mode 100644
index 0000000..40bd6fc
--- /dev/null
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.expire;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSimpleExpire extends SmokeTestBase {
+
+   public static final String SERVER_NAME_0 = "expire";
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      disableCheckThread();
+      startServer(SERVER_NAME_0, 0, 30000);
+   }
+
+   @Test
+   public void testSendExpire() throws Exception {
+      ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue queue = session.createQueue("q0");
+      MessageProducer producer = session.createProducer(queue);
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      producer.setTimeToLive(1000);
+      for (int i = 0; i < 20000; i++) {
+         producer.send(session.createTextMessage("expired"));
+         if (i % 5000 == 0) {
+            session.commit();
+            System.out.println("Sent " + i + " + messages");
+         }
+
+      }
+
+      session.commit();
+
+      Thread.sleep(5000);
+      producer.setTimeToLive(0);
+      for (int i = 0; i < 500; i++) {
+         producer.send(session.createTextMessage("ok"));
+
+      }
+      session.commit();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+
+      for (int i = 0; i < 500; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(10000);
+         Assert.assertNotNull(txt);
+         Assert.assertEquals("ok", txt.getText());
+      }
+
+      session.commit();
+
+
+
+   }
+
+}

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[6/6] activemq-artemis git commit: This closes #1207

jbertram
In reply to this post by jbertram
This closes #1207


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1f82c783
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1f82c783
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1f82c783

Branch: refs/heads/master
Commit: 1f82c783a74c5a4406b3ba0cca35b5ec5d1216c8
Parents: 7d5511c 1a39772
Author: Justin Bertram <[hidden email]>
Authored: Tue Apr 18 12:06:10 2017 -0500
Committer: Justin Bertram <[hidden email]>
Committed: Tue Apr 18 12:06:10 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/utils/RunnableEx.java      |  22 +++
 .../artemis/core/io/aio/AIOSequentialFile.java  |  38 +++-
 .../journal/impl/JournalFilesRepository.java    |  45 +++--
 .../amqp/broker/AMQPConnectionCallback.java     |   6 +
 .../protocol/amqp/broker/AMQPMessage.java       |   3 +-
 .../amqp/broker/AMQPSessionCallback.java        | 168 ++++++++++-------
 .../client/AMQPClientConnectionFactory.java     |   2 +-
 .../amqp/proton/AMQPConnectionContext.java      |   8 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  18 +-
 .../amqp/proton/handler/EventHandler.java       |   3 +
 .../amqp/proton/handler/ProtonHandler.java      |  20 +-
 .../transaction/ProtonTransactionHandler.java   |  62 ++++---
 .../core/paging/impl/PagingStoreImpl.java       |   2 +-
 .../core/server/ActiveMQServerLogger.java       |   6 +-
 .../artemis/core/server/impl/QueueImpl.java     |  46 ++++-
 tests/smoke-tests/pom.xml                       |  17 ++
 .../main/resources/servers/expire/broker.xml    | 184 +++++++++++++++++++
 .../tests/smoke/expire/TestSimpleExpire.java    |  92 ++++++++++
 18 files changed, 617 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


Loading...