[1/2] activemq-artemis git commit: ARTEMIS-1929 race in STOMP w/identical durable subs

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/2] activemq-artemis git commit: ARTEMIS-1929 race in STOMP w/identical durable subs

gaohoward-2
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7c5470548 -> 638ff75f4


ARTEMIS-1929 race in STOMP w/identical durable subs


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e53d8f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e53d8f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e53d8f5

Branch: refs/heads/master
Commit: 2e53d8f5fbf60da0aedc53e9da77f7579f301cad
Parents: 7c54705
Author: Justin Bertram <[hidden email]>
Authored: Tue Sep 25 16:41:27 2018 -0500
Committer: Howard Gao <[hidden email]>
Committed: Thu Nov 8 07:55:29 2018 +0800

----------------------------------------------------------------------
 .../core/protocol/stomp/StompSession.java       |  7 +-
 .../integration/stomp/v12/StompV12Test.java     | 83 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e53d8f5/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 291634f..b355168 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -268,7 +269,11 @@ public class StompSession implements SessionCallback {
             }
             queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
             if (manager.getServer().locateQueue(queueName) == null) {
-               session.createQueue(address, queueName, selectorSimple, false, true);
+               try {
+                  session.createQueue(address, queueName, selectorSimple, false, true);
+               } catch (ActiveMQQueueExistsException e) {
+                  // ignore; can be caused by concurrent durable subscribers
+               }
             }
          } else {
             queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e53d8f5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index 61ef22c..e3b61b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -32,10 +32,12 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
@@ -1452,6 +1454,87 @@ public class StompV12Test extends StompTestBase {
    }
 
    @Test
+   public void testMultipleDurableSubscribers() throws Exception {
+      org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
+      conn.connect(defUser, defPass, "myClientID");
+      StompClientConnectionV12 conn2 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
+      conn2.connect(defUser, defPass, "myClientID");
+
+      subscribe(conn, UUID.randomUUID().toString(), "client-individual", getName());
+      subscribe(conn2, UUID.randomUUID().toString(), "clientindividual", getName());
+
+      conn.closeTransport();
+      waitDisconnect(conn);
+      conn2.closeTransport();
+      waitDisconnect(conn2);
+   }
+
+   @Test
+   public void testMultipleConcurrentDurableSubscribers() throws Exception {
+      org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
+
+      int NUMBER_OF_THREADS = 25;
+      SubscriberThread[] threads = new SubscriberThread[NUMBER_OF_THREADS];
+      final CountDownLatch startFlag = new CountDownLatch(1);
+      final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
+
+      for (int i = 0; i < threads.length; i++) {
+         threads[i] = new SubscriberThread("subscriber::" + i, StompClientConnectionFactory.createClientConnection(uri), startFlag, alignFlag);
+      }
+
+      for (SubscriberThread t : threads) {
+         t.start();
+      }
+
+      alignFlag.await();
+
+      startFlag.countDown();
+
+      for (SubscriberThread t : threads) {
+         t.join();
+         Assert.assertEquals(0, t.errors.get());
+      }
+   }
+
+   class SubscriberThread extends Thread {
+      final StompClientConnection connection;
+      final CountDownLatch startFlag;
+      final CountDownLatch alignFlag;
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      SubscriberThread(String name, StompClientConnection connection, CountDownLatch startFlag, CountDownLatch alignFlag) {
+         super(name);
+         this.connection = connection;
+         this.startFlag = startFlag;
+         this.alignFlag = alignFlag;
+      }
+
+      @Override
+      public void run() {
+         try {
+            alignFlag.countDown();
+            startFlag.await();
+            connection.connect(defUser, defPass, "myClientID");
+            ClientStompFrame frame = subscribeTopic(connection, UUID.randomUUID().toString(), "client-individual", "123");
+            if (frame.getCommand().equals(Stomp.Responses.ERROR)) {
+
+               errors.incrementAndGet();
+            }
+         } catch (Exception e) {
+            e.printStackTrace();
+            errors.incrementAndGet();
+         } finally {
+            try {
+               connection.disconnect();
+               waitDisconnect((StompClientConnectionV12) connection);
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   @Test
    public void testDurableSubscriberWithReconnection() throws Exception {
       conn.connect(defUser, defPass, CLIENT_ID);
 

Reply | Threaded
Open this post in threaded view
|

[2/2] activemq-artemis git commit: This closes #2385

gaohoward-2
This closes #2385


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/638ff75f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/638ff75f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/638ff75f

Branch: refs/heads/master
Commit: 638ff75f4b200591f463d2225db4679769fcb920
Parents: 7c54705 2e53d8f
Author: Howard Gao <[hidden email]>
Authored: Thu Nov 8 08:00:22 2018 +0800
Committer: Howard Gao <[hidden email]>
Committed: Thu Nov 8 08:00:22 2018 +0800

----------------------------------------------------------------------
 .../core/protocol/stomp/StompSession.java       |  7 +-
 .../integration/stomp/v12/StompV12Test.java     | 83 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------