[1/2] activemq-artemis git commit: ARTEMIS-1098 Improve flow control while streaming large messages

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] activemq-artemis git commit: ARTEMIS-1098 Improve flow control while streaming large messages

martyntaylor
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x e82ef026b -> 75994b5fb


ARTEMIS-1098 Improve flow control while streaming large messages

(cherry picked from commit c6d24e9073d0793350c24234f85f8d4532d250ff)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c336328
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c336328
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c336328

Branch: refs/heads/1.x
Commit: 3c336328825cc02a9ee638490b88b04244009e35
Parents: e82ef02
Author: Francesco Nigro <[hidden email]>
Authored: Mon Apr 10 13:47:54 2017 +0200
Committer: Martyn Taylor <[hidden email]>
Committed: Tue Apr 11 10:47:46 2017 +0100

----------------------------------------------------------------------
 .../core/client/ActiveMQClientLogger.java       |  5 ++
 .../core/impl/ActiveMQSessionContext.java       | 57 ++++++++++++--------
 2 files changed, 40 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c336328/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index 0fe4a5a..748e508 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void broadcastGroupBindError(String hostAndPort);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 212057, value = "Large Message Streaming is taking too long to flush on back pressure.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void timeoutStreamingLargeMessage();
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
    void onMessageError(@Cause Throwable e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c336328/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 56c7135..d355253 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -438,17 +439,7 @@ public class ActiveMQSessionContext extends SessionContext {
                                     byte[] chunk,
                                     int reconnectID,
                                     SendAcknowledgementHandler messageHandler) throws ActiveMQException {
-      final boolean requiresResponse = lastChunk && sendBlocking;
-      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
-
-      if (requiresResponse) {
-         // When sending it blocking, only the last chunk will be blocking.
-         sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
-      } else {
-         sessionChannel.send(chunkPacket, reconnectID);
-      }
-
-      return chunkPacket.getPacketSize();
+      return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
    }
 
    @Override
@@ -458,17 +449,7 @@ public class ActiveMQSessionContext extends SessionContext {
                                           boolean lastChunk,
                                           byte[] chunk,
                                           SendAcknowledgementHandler messageHandler) throws ActiveMQException {
-      final boolean requiresResponse = lastChunk && sendBlocking;
-      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
-
-      if (requiresResponse) {
-         // When sending it blocking, only the last chunk will be blocking.
-         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
-      } else {
-         sessionChannel.send(chunkPacket);
-      }
-
-      return chunkPacket.getPacketSize();
+      return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
    }
 
    @Override
@@ -770,6 +751,38 @@ public class ActiveMQSessionContext extends SessionContext {
       }
    }
 
+   private static int sendSessionSendContinuationMessage(Channel channel,
+                                                         MessageInternal msgI,
+                                                         long messageBodySize,
+                                                         boolean sendBlocking,
+                                                         boolean lastChunk,
+                                                         byte[] chunk,
+                                                         SendAcknowledgementHandler messageHandler) throws ActiveMQException {
+      final boolean requiresResponse = lastChunk && sendBlocking;
+      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+      final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
+      //perform a weak form of flow control to avoid OOM on tight loops
+      final CoreRemotingConnection connection = channel.getConnection();
+      final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
+      final long startFlowControl = System.nanoTime();
+      final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
+      if (!isWritable) {
+         final long endFlowControl = System.nanoTime();
+         final long elapsedFlowControl = endFlowControl - startFlowControl;
+         final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
+         ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
+         logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
+      }
+      if (requiresResponse) {
+         // When sending it blocking, only the last chunk will be blocking.
+         channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+      } else {
+         channel.send(chunkPacket);
+      }
+      return chunkPacket.getPacketSize();
+   }
+
+
    class ClientSessionPacketHandler implements ChannelHandler {
 
       @Override

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] activemq-artemis git commit: This closes #1195

martyntaylor
This closes #1195


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/75994b5f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/75994b5f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/75994b5f

Branch: refs/heads/1.x
Commit: 75994b5fb2e9d4824b23c98f7159f7ee0713d8ea
Parents: e82ef02 3c33632
Author: Martyn Taylor <[hidden email]>
Authored: Tue Apr 11 10:47:47 2017 +0100
Committer: Martyn Taylor <[hidden email]>
Committed: Tue Apr 11 10:47:47 2017 +0100

----------------------------------------------------------------------
 .../core/client/ActiveMQClientLogger.java       |  5 ++
 .../core/impl/ActiveMQSessionContext.java       | 57 ++++++++++++--------
 2 files changed, 40 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


Loading...