Quantcast

[1/3] activemq-artemis git commit: ARTEMIS-1045 Performance improvements on AMQP

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/3] activemq-artemis git commit: ARTEMIS-1045 Performance improvements on AMQP

tabish
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 861c23155 -> 224d78062


ARTEMIS-1045 Performance improvements on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/291a4719
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/291a4719
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/291a4719

Branch: refs/heads/master
Commit: 291a4719b6b114b1452a272fd13393262f736a05
Parents: 861c231
Author: Clebert Suconic <[hidden email]>
Authored: Fri Mar 17 11:14:18 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Mar 17 16:11:14 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 68 +++++++++++---------
 .../amqp/proton/ProtonServerSenderContext.java  | 13 ----
 2 files changed, 39 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/291a4719/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 60aae4c..653ee5f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -72,6 +72,7 @@ public class AMQPMessage extends RefCountMessage {
    private DeliveryAnnotations _deliveryAnnotations;
    private MessageAnnotations _messageAnnotations;
    private Properties _properties;
+   private int appLocation = -1;
    private ApplicationProperties applicationProperties;
    private long scheduledTime = -1;
    private String connectionID;
@@ -93,7 +94,7 @@ public class AMQPMessage extends RefCountMessage {
 
    public AMQPMessage(long messageFormat, Message message) {
       this.messageFormat = messageFormat;
-      this.protonMessage = (MessageImpl)message;
+      this.protonMessage = (MessageImpl) message;
 
    }
 
@@ -124,7 +125,7 @@ public class AMQPMessage extends RefCountMessage {
             _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
             _properties = new Properties();
             this.applicationProperties = new ApplicationProperties(new HashMap<>());
-            this.protonMessage = (MessageImpl)Message.Factory.create();
+            this.protonMessage = (MessageImpl) Message.Factory.create();
             this.protonMessage.setApplicationProperties(applicationProperties);
             this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
          }
@@ -148,6 +149,20 @@ public class AMQPMessage extends RefCountMessage {
 
    private ApplicationProperties getApplicationProperties() {
       parseHeaders();
+
+      if (applicationProperties == null && appLocation >= 0) {
+         ByteBuffer buffer = getBuffer().nioBuffer();
+         buffer.position(appLocation);
+         TLSEncode.getDecoder().setByteBuffer(buffer);
+         Object section = TLSEncode.getDecoder().readObject();
+         if (section instanceof ApplicationProperties) {
+            this.applicationProperties = (ApplicationProperties) section;
+         }
+         this.appLocation = -1;
+         TLSEncode.getDecoder().setByteBuffer(null);
+
+      }
+
       return applicationProperties;
    }
 
@@ -161,6 +176,7 @@ public class AMQPMessage extends RefCountMessage {
          parsedHeaders = true;
       }
    }
+
    @Override
    public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
       this.connectionID = connectionID;
@@ -172,7 +188,6 @@ public class AMQPMessage extends RefCountMessage {
       return connectionID;
    }
 
-
    public MessageAnnotations getMessageAnnotations() {
       parseHeaders();
       return _messageAnnotations;
@@ -202,7 +217,6 @@ public class AMQPMessage extends RefCountMessage {
       return null;
    }
 
-
    private void setSymbol(String symbol, Object value) {
       setSymbol(Symbol.getSymbol(symbol), value);
    }
@@ -231,11 +245,9 @@ public class AMQPMessage extends RefCountMessage {
             return null;
       } */
 
-
       return null;
    }
 
-
    @Override
    public SimpleString getGroupID() {
       parseHeaders();
@@ -247,7 +259,6 @@ public class AMQPMessage extends RefCountMessage {
       }
    }
 
-
    @Override
    public Long getScheduledDeliveryTime() {
 
@@ -339,15 +350,19 @@ public class AMQPMessage extends RefCountMessage {
                this.expiration = _properties.getAbsoluteExpiryTime().getTime();
             }
 
-            if (buffer.hasRemaining()) {
-               section = (Section) decoder.readObject();
-            } else {
-               section = null;
-            }
+            // We don't read the next section on purpose, as we will parse ApplicationProperties
+            // lazily
+            section = null;
          }
 
          if (section instanceof ApplicationProperties) {
             applicationProperties = (ApplicationProperties) section;
+         } else {
+            if (buffer.hasRemaining()) {
+               this.appLocation = buffer.position();
+            } else {
+               this.appLocation = -1;
+            }
          }
       } finally {
          decoder.setByteBuffer(null);
@@ -446,13 +461,11 @@ public class AMQPMessage extends RefCountMessage {
       }
    }
 
-
    @Override
    public Object getDuplicateProperty() {
       return null;
    }
 
-
    @Override
    public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
       return null;
@@ -463,7 +476,7 @@ public class AMQPMessage extends RefCountMessage {
       if (address == null) {
          Properties properties = getProtonMessage().getProperties();
          if (properties != null) {
-            return  properties.getTo();
+            return properties.getTo();
          } else {
             return null;
          }
@@ -539,7 +552,7 @@ public class AMQPMessage extends RefCountMessage {
             header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1));
             TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
             TLSEncode.getEncoder().writeObject(header);
-            TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
+            TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
          }
       }
       buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
@@ -676,27 +689,27 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Boolean)getApplicationPropertiesMap().get(key);
+      return (Boolean) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Byte)getApplicationPropertiesMap().get(key);
+      return (Byte) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Double)getApplicationPropertiesMap().get(key);
+      return (Double) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Integer)getApplicationPropertiesMap().get(key);
+      return (Integer) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Long)getApplicationPropertiesMap().get(key);
+      return (Long) getApplicationPropertiesMap().get(key);
    }
 
    @Override
@@ -712,12 +725,12 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Short)getApplicationPropertiesMap().get(key);
+      return (Short) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
-      return (Float)getApplicationPropertiesMap().get(key);
+      return (Float) getApplicationPropertiesMap().get(key);
    }
 
    @Override
@@ -727,7 +740,7 @@ public class AMQPMessage extends RefCountMessage {
       } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
          return getConnectionID();
       } else {
-         return (String)getApplicationPropertiesMap().get(key);
+         return (String) getApplicationPropertiesMap().get(key);
       }
    }
 
@@ -747,7 +760,7 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
-      return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key));
+      return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
    }
 
    @Override
@@ -842,8 +855,7 @@ public class AMQPMessage extends RefCountMessage {
    @Override
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
-         memoryEstimate = memoryOffset +
-            (data != null ? data.capacity() : 0);
+         memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
       }
 
       return memoryEstimate;
@@ -858,7 +870,6 @@ public class AMQPMessage extends RefCountMessage {
       }
    }
 
-
    @Override
    public SimpleString getReplyTo() {
       if (getProperties() != null) {
@@ -877,7 +888,6 @@ public class AMQPMessage extends RefCountMessage {
       return this;
    }
 
-
    @Override
    public int getPersistSize() {
       checkBuffer();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/291a4719/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 962110e..0e0447f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -42,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
-import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -89,7 +88,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private boolean multicast;
    //todo get this from somewhere
    private RoutingType defaultRoutingType = RoutingType.ANYCAST;
-   protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
    private RoutingType routingTypeToUse = defaultRoutingType;
    private boolean shared = false;
    private boolean global = false;
@@ -110,7 +108,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
    @Override
    public void onFlow(int currentCredits, boolean drain) {
-      this.creditsSemaphore.setCredits(currentCredits);
       sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
    }
 
@@ -590,16 +587,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          return 0;
       }
 
-      if (!creditsSemaphore.tryAcquire()) {
-         try {
-            creditsSemaphore.acquire();
-         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            // nothing to be done here.. we just keep going
-            throw new IllegalStateException(e.getMessage(), e);
-         }
-      }
-
       // presettle means we can settle the message on the dealer side before we send it, i.e.
       // for browsers
       boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[3/3] activemq-artemis git commit: This closes #1102

tabish
This closes #1102


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/224d7806
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/224d7806
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/224d7806

Branch: refs/heads/master
Commit: 224d780622be3f65cef027bd4f84078102d25919
Parents: 861c231 1ef4dcf
Author: Timothy Bish <[hidden email]>
Authored: Fri Mar 17 17:28:23 2017 -0400
Committer: Timothy Bish <[hidden email]>
Committed: Fri Mar 17 17:28:23 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPConnectionCallback.java     | 25 ++++--
 .../protocol/amqp/broker/AMQPMessage.java       | 68 ++++++++-------
 .../amqp/broker/AMQPSessionCallback.java        | 30 +++----
 .../amqp/proton/AMQPSessionContext.java         |  2 +-
 .../proton/ProtonServerReceiverContext.java     |  4 +-
 .../amqp/proton/ProtonServerSenderContext.java  | 15 +---
 .../transaction/ProtonTransactionHandler.java   | 78 ++++++++++-------
 .../protocol/amqp/util/DeliveryUtil.java        | 18 +---
 .../integration/amqp/AmqpTransactionTest.java   | 90 +++++++++++++++++++-
 9 files changed, 204 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/3] activemq-artemis git commit: ARTEMIS-1046 Fixing TX eventually stalling with AMQP

tabish
In reply to this post by tabish
ARTEMIS-1046 Fixing TX eventually stalling with AMQP

I have also reviewed the model in which we used transactions


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1ef4dcf7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1ef4dcf7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1ef4dcf7

Branch: refs/heads/master
Commit: 1ef4dcf7d921379618363d035ea8a09794c3637d
Parents: 291a471
Author: Clebert Suconic <[hidden email]>
Authored: Fri Mar 17 15:59:34 2017 -0400
Committer: Clebert Suconic <[hidden email]>
Committed: Fri Mar 17 16:50:56 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPConnectionCallback.java     | 25 ++++--
 .../amqp/broker/AMQPSessionCallback.java        | 30 +++----
 .../amqp/proton/AMQPSessionContext.java         |  2 +-
 .../proton/ProtonServerReceiverContext.java     |  4 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  2 +-
 .../transaction/ProtonTransactionHandler.java   | 78 ++++++++++-------
 .../protocol/amqp/util/DeliveryUtil.java        | 18 +---
 .../integration/amqp/AmqpTransactionTest.java   | 90 +++++++++++++++++++-
 8 files changed, 165 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 7e7dc60..4265f28 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -62,7 +62,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
 
    private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class);
 
-   private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
+   private ConcurrentMap<Binary, Transaction> transactions = new ConcurrentHashMap<>();
 
    private final ProtonProtocolManager manager;
 
@@ -224,25 +224,32 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
 
    public Binary newTransaction() {
       XidImpl xid = newXID();
+      Binary binary = new Binary(xid.getGlobalTransactionId());
       Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
-      transactions.put(xid, transaction);
-      return new Binary(xid.getGlobalTransactionId());
+      transactions.put(binary, transaction);
+      return binary;
    }
 
-   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
-      XidImpl xid = newXID(txid.getArray());
-      Transaction tx = transactions.get(xid);
+   public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
+      Transaction tx;
+
+      if (remove) {
+         tx = transactions.remove(txid);
+      } else {
+         tx = transactions.get(txid);
+      }
 
       if (tx == null) {
-         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
+         logger.warn("Couldn't find txid = " + txid);
+         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(txid.toString());
       }
 
       return tx;
    }
 
-   public void removeTransaction(Binary txid) {
+   public Transaction removeTransaction(Binary txid) {
       XidImpl xid = newXID(txid.getArray());
-      transactions.remove(xid);
+      return transactions.remove(xid);
    }
 
    protected XidImpl newXID() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 7f7e22b..3592dbc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
-import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -92,6 +91,10 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AtomicBoolean draining = new AtomicBoolean(false);
 
+   public Object getProtonLock() {
+      return connection.getLock();
+   }
+
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -382,8 +385,10 @@ public class AMQPSessionCallback implements SessionCallback {
       condition.setDescription(errorMessage);
       Rejected rejected = new Rejected();
       rejected.setError(condition);
-      delivery.disposition(rejected);
-      delivery.settle();
+      synchronized (connection.getLock()) {
+         delivery.disposition(rejected);
+         delivery.settle();
+      }
       connection.flush();
    }
 
@@ -536,29 +541,14 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
-      return protonSPI.getTransaction(txid);
+   public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
+      return protonSPI.getTransaction(txid, remove);
    }
 
    public Binary newTransaction() {
       return protonSPI.newTransaction();
    }
 
-   public void commitTX(Binary txid) throws Exception {
-      Transaction tx = protonSPI.getTransaction(txid);
-      tx.commit(true);
-      protonSPI.removeTransaction(txid);
-   }
-
-   public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
-      Transaction tx = protonSPI.getTransaction(txid);
-      tx.rollback();
-      protonSPI.removeTransaction(txid);
-   }
-
-   public void dischargeTx(Binary txid) throws ActiveMQAMQPException {
-      ((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge();
-   }
 
    public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
       return serverSession.getMatchingQueue(address, routingType);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index d1fc0e1..ccc4a6c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -142,7 +142,7 @@ public class AMQPSessionContext extends ProtonInitializable {
    }
 
    public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
-      ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
+      ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection);
 
       coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index f08c1fc..54467cf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -155,7 +155,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          if (delivery.getRemoteState() instanceof TransactionalState) {
 
             TransactionalState txState = (TransactionalState) delivery.getRemoteState();
-            tx = this.sessionSPI.getTransaction(txState.getTxnId());
+            tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
          }
 
          sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
@@ -201,8 +201,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       } else {
          synchronized (connection.getLock()) {
             receiver.flow(credits);
-            connection.flush();
          }
+         connection.flush();
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 0e0447f..5a97c02 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -493,7 +493,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          if (remoteState instanceof TransactionalState) {
 
             TransactionalState txState = (TransactionalState) remoteState;
-            ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId());
+            ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
 
             if (txState.getOutcome() != null) {
                settleImmediate = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 721bd33..12498b0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -18,10 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
-import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
 import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -36,9 +35,6 @@ import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 /**
  * handles an amqp Coordinator to deal with transaction boundaries etc
  */
@@ -47,17 +43,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
 
    public static final int DEFAULT_COORDINATOR_CREDIT = 100;
+   public static final int CREDIT_LOW_WATERMARK = 30;
 
    final AMQPSessionCallback sessionSPI;
+   final AMQPConnectionContext connection;
 
-   public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
+   public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
       this.sessionSPI = sessionSPI;
+      this.connection = connection;
    }
 
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
       final Receiver receiver;
       try {
          receiver = ((Receiver) delivery.getLink());
@@ -66,9 +63,21 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             return;
          }
 
-         receiver.recv(new NettyWritable(buffer));
+         byte[] buffer;
+
+         synchronized (connection.getLock()) {
+            // Replenish coordinator receiver credit on exhaustion so sender can continue
+            // transaction declare and discahrge operations.
+            if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
+               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+            }
+
+            buffer = new byte[delivery.available()];
+            receiver.recv(buffer, 0, buffer.length);
+            receiver.advance();
+         }
+
 
-         receiver.advance();
 
          MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
 
@@ -78,44 +87,47 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             Binary txID = sessionSPI.newTransaction();
             Declared declared = new Declared();
             declared.setTxnId(txID);
-            delivery.disposition(declared);
+            synchronized (connection.getLock()) {
+               delivery.disposition(declared);
+            }
          } else if (action instanceof Discharge) {
             Discharge discharge = (Discharge) action;
 
             Binary txID = discharge.getTxnId();
-            sessionSPI.dischargeTx(txID);
+            ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
+            tx.discharge();
+
             if (discharge.getFail()) {
-               try {
-                  sessionSPI.rollbackTX(txID, true);
+               tx.rollback();
+               synchronized (connection.getLock()) {
                   delivery.disposition(new Accepted());
-               } catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                }
+               connection.flush();
             } else {
-               try {
-                  sessionSPI.commitTX(txID);
+               tx.commit();
+               synchronized (connection.getLock()) {
                   delivery.disposition(new Accepted());
-               } catch (ActiveMQAMQPException amqpE) {
-                  throw amqpE;
-               } catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
                }
-            }
-
-            // Replenish coordinator receiver credit on exhaustion so sender can continue
-            // transaction declare and discahrge operations.
-            if (receiver.getCredit() == 0) {
-               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+               connection.flush();
             }
          }
       } catch (ActiveMQAMQPException amqpE) {
-         delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
-      } catch (Exception e) {
+         log.warn(amqpE.getMessage(), amqpE);
+         synchronized (connection.getLock()) {
+            delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+         }
+         connection.flush();
+      } catch (Throwable e) {
          log.warn(e.getMessage(), e);
-         delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+         synchronized (connection.getLock()) {
+            delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+         }
+         connection.flush();
       } finally {
-         delivery.settle();
-         buffer.release();
+         synchronized (connection.getLock()) {
+            delivery.settle();
+         }
+         connection.flush();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
index 9257c6b..4267b85 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
@@ -16,28 +16,14 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.util;
 
-import io.netty.buffer.ByteBuf;
-import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 
 public class DeliveryUtil {
 
-   public static int readDelivery(Receiver receiver, ByteBuf buffer) {
-      int initial = buffer.writerIndex();
-      // optimization by norman
-      int count;
-      while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) {
-         // Increment the writer index by the number of bytes written into it while calling recv.
-         buffer.writerIndex(buffer.writerIndex() + count);
-         buffer.ensureWritable(count);
-      }
-      return buffer.writerIndex() - initial;
-   }
-
-   public static MessageImpl decodeMessageImpl(ByteBuf buffer) {
+   public static MessageImpl decodeMessageImpl(byte[] data) {
       MessageImpl message = (MessageImpl) Message.Factory.create();
-      message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+      message.decode(data, 0, data.length);
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index c00cc1c..41bc5e7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,9 +17,20 @@
 
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -27,6 +38,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -788,4 +801,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
       connection.close();
    }
+
+   @Test(timeout = 120000)
+   public void testSendPersistentTX() throws Exception {
+      int MESSAGE_COUNT = 100000;
+      AtomicInteger errors = new AtomicInteger(0);
+      server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true);
+      ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+      Connection sendConnection = factory.createConnection();
+      Connection consumerConnection = factory.createConnection();
+      try {
+
+         Thread receiverThread = new Thread() {
+            @Override
+            public void run() {
+               try {
+                  consumerConnection.start();
+                  Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+                  javax.jms.Queue q1 = consumerSession.createQueue("q1");
+
+                  MessageConsumer consumer = consumerSession.createConsumer(q1);
+
+                  for (int i = 1; i <= MESSAGE_COUNT; i++) {
+                     Message message = consumer.receive(5000);
+                     if (message == null) {
+                        throw new IOException("No message read in time.");
+                     }
+
+                     if (i % 100 == 0) {
+                        if (i % 1000 == 0) System.out.println("Read message " + i);
+                        consumerSession.commit();
+                     }
+                  }
+
+                  // Assure that all messages are consumed
+                  consumerSession.commit();
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+
+            }
+         };
+
+         receiverThread.start();
+
+         Session sendingSession = sendConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+         javax.jms.Queue q1 = sendingSession.createQueue("q1");
+         MessageProducer producer = sendingSession.createProducer(q1);
+         producer.setDeliveryDelay(DeliveryMode.NON_PERSISTENT);
+         for (int i = 0; i < MESSAGE_COUNT; i++) {
+            producer.send(sendingSession.createTextMessage("message " + i), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+            if (i % 100 == 0) {
+               if (i % 1000 == 0) System.out.println("Sending " + i);
+               sendingSession.commit();
+            }
+         }
+
+         sendingSession.commit();
+
+         receiverThread.join(50000);
+         Assert.assertFalse(receiverThread.isAlive());
+
+         Assert.assertEquals(0, errors.get());
+
+      } catch (Exception e) {
+         e.printStackTrace();
+      } finally {
+         sendConnection.close();
+         consumerConnection.close();
+      }
+
+   }
 }

Loading...