[1/2] activemq-artemis git commit: ARTEMIS-1110 Cleanup Transaction Coordinator buffer handling

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/2] activemq-artemis git commit: ARTEMIS-1110 Cleanup Transaction Coordinator buffer handling

clebertsuconic-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 62096f975 -> f282dff57


ARTEMIS-1110 Cleanup Transaction Coordinator buffer handling

Resuse a single small buffer for all txn commands (declare / dischare) to
avoid creating lots of small arrays and ByteBuffer wrappers for txn operations.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d2731fa0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d2731fa0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d2731fa0

Branch: refs/heads/master
Commit: d2731fa0e1d66831ad95011ff00a76727bd7d5e8
Parents: 62096f9
Author: Timothy Bish <[hidden email]>
Authored: Tue Apr 11 11:16:26 2017 -0400
Committer: Timothy Bish <[hidden email]>
Committed: Tue Apr 11 11:33:58 2017 -0400

----------------------------------------------------------------------
 .../transaction/ProtonTransactionHandler.java   | 53 +++++++++++++-------
 .../protocol/amqp/util/DeliveryUtil.java        | 30 -----------
 2 files changed, 35 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d2731fa0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index a3dae25..f817ed4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 
+import java.nio.ByteBuffer;
+
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
-import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -32,6 +33,7 @@ import org.apache.qpid.proton.amqp.transaction.Discharge;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.jboss.logging.Logger;
 
@@ -48,6 +50,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    final AMQPSessionCallback sessionSPI;
    final AMQPConnectionContext connection;
 
+   private final ByteBuffer DECODE_BUFFER = ByteBuffer.allocate(64);
+
    public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
       this.sessionSPI = sessionSPI;
       this.connection = connection;
@@ -65,7 +69,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             return;
          }
 
-         byte[] buffer;
+         ByteBuffer buffer;
+         MessageImpl msg;
 
          synchronized (connection.getLock()) {
             // Replenish coordinator receiver credit on exhaustion so sender can continue
@@ -74,14 +79,22 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
                receiver.flow(amqpCredit);
             }
 
-            buffer = new byte[delivery.available()];
-            receiver.recv(buffer, 0, buffer.length);
-            receiver.advance();
-         }
+            // Declare is generally 7 bytes and discharge is around 48 depending on the
+            // encoded size of the TXN ID.  Decode buffer has a bit of extra space but if
+            // the incoming request is to big just use a scratch buffer.
+            if (delivery.available() > DECODE_BUFFER.capacity()) {
+               buffer = ByteBuffer.allocate(delivery.available());
+            } else {
+               buffer = (ByteBuffer) DECODE_BUFFER.clear();
+            }
 
+            // Update Buffer for the next incoming command.
+            buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity()));
 
+            receiver.advance();
 
-         MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
+            msg = decodeMessage(buffer);
+         }
 
          Object action = ((AmqpValue) msg.getBody()).getValue();
 
@@ -133,26 +146,30 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
       }
    }
 
-   private Rejected createRejected(Symbol amqpError, String message) {
-      Rejected rejected = new Rejected();
-      ErrorCondition condition = new ErrorCondition();
-      condition.setCondition(amqpError);
-      condition.setDescription(message);
-      rejected.setError(condition);
-      return rejected;
-   }
-
    @Override
    public void onFlow(int credits, boolean drain) {
    }
 
    @Override
    public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
-      // no op
    }
 
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
-      // no op
+   }
+
+   private Rejected createRejected(Symbol amqpError, String message) {
+      Rejected rejected = new Rejected();
+      ErrorCondition condition = new ErrorCondition();
+      condition.setCondition(amqpError);
+      condition.setDescription(message);
+      rejected.setError(condition);
+      return rejected;
+   }
+
+   private MessageImpl decodeMessage(ByteBuffer encoded) {
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.decode(encoded);
+      return message;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d2731fa0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
deleted file mode 100644
index 4267b85..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.util;
-
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-
-public class DeliveryUtil {
-
-   public static MessageImpl decodeMessageImpl(byte[] data) {
-      MessageImpl message = (MessageImpl) Message.Factory.create();
-      message.decode(data, 0, data.length);
-      return message;
-   }
-
-}

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/2] activemq-artemis git commit: This closes #1197

clebertsuconic-2
This closes #1197


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f282dff5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f282dff5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f282dff5

Branch: refs/heads/master
Commit: f282dff57c9f27e955e54e5fff25eeeb5467e7a3
Parents: 62096f9 d2731fa
Author: Clebert Suconic <[hidden email]>
Authored: Tue Apr 11 11:48:30 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Tue Apr 11 11:48:30 2017 -0400

----------------------------------------------------------------------
 .../transaction/ProtonTransactionHandler.java   | 53 +++++++++++++-------
 .../protocol/amqp/util/DeliveryUtil.java        | 30 -----------
 2 files changed, 35 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


Loading...