activemq git commit: AMQ-6858 - reworking durable subscription propagation fix

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

activemq git commit: AMQ-6858 - reworking durable subscription propagation fix

cshannon
Repository: activemq
Updated Branches:
  refs/heads/master a0a23b99c -> 41211c78d


AMQ-6858 - reworking durable subscription propagation fix

Significantly reworking previous fix so that the client id is properly
changed when tracking network proxy subscriptions. This makes it so
removal is done properly


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41211c78
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41211c78
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41211c78

Branch: refs/heads/master
Commit: 41211c78d19b545a2352584d3598346aa3705be4
Parents: a0a23b9
Author: Christopher L. Shannon <[hidden email]>
Authored: Sun Nov 12 15:37:40 2017 -0500
Committer: Christopher L. Shannon (cshannon) <[hidden email]>
Committed: Mon Nov 13 11:07:43 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |   8 +-
 .../network/DemandForwardingBridgeSupport.java  |  86 ++-
 .../activemq/network/DemandSubscription.java    |   8 +
 .../activemq/network/DurableConduitBridge.java  |   6 +-
 .../DurableFiveBrokerNetworkBridgeTest.java     | 576 +++++++++++++++++++
 .../DurableThreeBrokerNetworkBridgeTest.java    | 241 --------
 6 files changed, 659 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 6ced896..bc9d004 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -80,7 +81,12 @@ public class ConduitBridge extends DemandForwardingBridge {
                         ds.addForcedDurableConsumer(info.getConsumerId());
                     }
                 } else {
-                    ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+                 if (isProxyNSConsumer(info)) {
+                    final BrokerId[] path = info.getBrokerPath();
+                    addProxyNetworkSubscription(ds, path, info.getSubscriptionName());
+                 } else {
+                 ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+                 }
                 }
                 matched = true;
                 // continue - we want interest to any existing DemandSubscriptions

http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index efdfa5a..03e79e4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
@@ -94,7 +95,6 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.failover.FailoverTransport;
-import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -666,11 +666,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
      * @param info
      * @return
      */
-    protected boolean isBridgeNS(ConsumerInfo info) {
+    protected boolean isDirectBridgeConsumer(ConsumerInfo info) {
         return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
                 (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
     }
 
+    private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
+        if (info.getSubcriptionName() != null && info.getClientId() != null) {
+            if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
+                    && !info.getClientId().startsWith(configuration.getName())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) {
+        if (sub != null && path.length > 1 && subName != null) {
+            String b1 = path[path.length-1].toString();
+            String b2 = path[path.length-2].toString();
+            final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + "_inbound_" + b1, subName);
+            sub.getDurableRemoteSubs().add(newSubInfo);
+            sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
+            LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
+        } else {
+            LOG.debug("Skipping addProxyNetworkSubscription");
+        }
+    }
+
+    private String getProxyBridgeClientId(SubscriptionInfo info) {
+        String[] clientIdTokens = info.getClientId().split("_");
+        String newClientId = "";
+        if (clientIdTokens.length > 2) {
+            for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; j++) {
+                newClientId += clientIdTokens[j];
+                if (j < clientIdTokens.length -1) {
+                    newClientId += "_";
+                }
+            }
+        }
+        return newClientId;
+    }
+
+    protected boolean isProxyNSConsumer(ConsumerInfo info) {
+        return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
+    }
+
     protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
@@ -706,7 +747,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                                     //dynamicallyIncludedDestinations list
                                                     //Also re-add network consumers that are not part of this direct
                                                     //bridge (proxy of proxy bridges)
-                                                    if((info.getSubscriptionName() == null || !isBridgeNS(info)) &&
+                                                    if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
                                                             NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
                                                         serviceRemoteConsumerAdvisory(info);
                                                     }
@@ -975,8 +1016,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                 DemandSubscription ds = i.next();
                 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
+
                 if (removed) {
                     cleanupDurableSub(ds, i);
+                //If this is a proxy bridge subscription we need to try changing the clientId
+                } else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){
+                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo));
+                    if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
+                        AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger());
+                        count.decrementAndGet();
+                        //Only remove the durable remote sub if the count <= 0
+                        if (count.get() <= 0) {
+                            ds.getDurableRemoteSubs().remove(subscriptionInfo);
+                            ds.getNetworkDemandConsumerMap().remove(subscriptionInfo);
+                            cleanupDurableSub(ds, i);
+                        }
+                    }
                 }
             }
         }
@@ -984,6 +1039,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     private void cleanupDurableSub(final DemandSubscription ds,
             Iterator<DemandSubscription> i) throws IOException {
+
         if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
                 && ds.getForcedDurableConsumersSize() == 0) {
             // deactivate subscriber
@@ -998,9 +1054,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             localBroker.oneway(sending);
 
             //remove subscriber from map
-            if (i != null) {
-                i.remove();
-            }
+            i.remove();
         }
     }
 
@@ -1094,18 +1148,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 public void run() {
                     sub.waitForCompletion();
                     try {
-                        //If removing a network durable subscription that still has durable remote subs
-                        //make sure we cleanup the durable subscription properly - necessary when using
-                        //durable subscriptions and 3 or more brokers
-                        if (configuration.isConduitSubscriptions() &&
-                                sub.getLocalInfo().getSubscriptionName() != null &&
-                                sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
-                                sub.getDurableRemoteSubs().size() > 0) {
-                            sub.getDurableRemoteSubs().clear();
-                            cleanupDurableSub(sub, null);
-                        } else {
-                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
-                        }
+                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
                     } catch (IOException e) {
                         LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
                     }
@@ -1367,7 +1410,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 undoMapRegistration(sub);
             } else {
                 if (consumerInfo.isDurable()) {
-                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
+                 if (isProxyNSConsumer(sub.getRemoteInfo())) {
+                 BrokerId[] path = sub.getRemoteInfo().getBrokerPath();
+                 addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName());
+                 } else {
+             sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
+             }
                 }
                 addSubscription(sub);
                 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);

http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index 371df0a..96a9baf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.network;
 
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +44,8 @@ public class DemandSubscription {
     private final AtomicBoolean activeWaiter = new AtomicBoolean();
     private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
     private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
+    //Used for proxy network consumers
+    private final Map<SubscriptionInfo, AtomicInteger> networkDemandConsumerMap = new ConcurrentHashMap<>();
     private SubscriptionInfo localDurableSubscriber;
 
     private NetworkBridgeFilter networkBridgeFilter;
@@ -83,6 +87,10 @@ public class DemandSubscription {
         return durableRemoteSubs;
     }
 
+    public Map<SubscriptionInfo, AtomicInteger> getNetworkDemandConsumerMap() {
+        return networkDemandConsumerMap;
+    }
+
     /**
      * @return true if there are no interested consumers
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 42f30a4..8d14f74 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -142,12 +142,8 @@ public class DurableConduitBridge extends ConduitBridge {
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
             // and override the consumerId with something unique so that it won't
             // be removed if the durable subscriber (at the other end) goes away
-            //Only do this for direct bridge consumers - proxy network consumers we don't
-            //want to replace the consumerId or cleanup won't happen properly
-            if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) {
-                info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+           info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
                                consumerIdGenerator.getNextSequenceId()));
-            }
         }
         info.setSelector(null);
         DemandSubscription demandSubscription = doCreateDemandSubscription(info);

http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
new file mode 100644
index 0000000..4a63553
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Test;
+
+/**
+ * Test to make sure durable subscriptions propagate properly throughout network bridges
+ * and that conduit subscriptions work properly
+ */
+public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
+
+    private boolean duplex = true;
+
+    @Override
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
+        connector.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
+        connector.setDuplex(duplex);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setSyncDurableSubs(true);
+        connector.setNetworkTTL(-1);
+        return connector;
+    }
+
+    public void testDurablePropagationDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagation();
+    }
+
+    public void testDurablePropagationOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagation();
+    }
+
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    protected void testDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+        assertNotNull(clientB.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subB");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagationConsumerAllBrokersDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationConsumerAllBrokers();
+    }
+
+    public void testDurablePropagationConsumerAllBrokersOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationConsumerAllBrokers();
+    }
+
+    protected void testDurablePropagationConsumerAllBrokers() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerB");
+        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        Session ses3 = createSession("BrokerC");
+        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subB");
+        ses3.unsubscribe("subC");
+
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagation5BrokerDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagation5Broker();
+    }
+
+    public void testDurablePropagation5BrokerOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagation5Broker();
+    }
+
+    protected void testDurablePropagation5Broker() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerC", "BrokerD");
+        bridgeBrokers("BrokerD", "BrokerE");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("BrokerD", "BrokerC");
+            bridgeBrokers("BrokerE", "BrokerD");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerE", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerE");
+        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        Thread.sleep(1000);
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientE.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subE");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagationSpokeDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationSpoke();
+    }
+
+    public void testDurablePropagationSpokeOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationSpoke();
+    }
+
+    protected void testDurablePropagationSpoke() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerB", "BrokerD");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("BrokerD", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerB");
+        Session ses3 = createSession("BrokerC");
+        Session ses4 = createSession("BrokerD");
+
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        sendMessages("BrokerA", dest, 1);
+        assertNotNull(clientD.receive(1000));
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientD.receive(1000));
+
+        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientAB.close();
+        clientB.close();
+        clientC.close();
+        clientD.close();
+
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subAB");
+        ses2.unsubscribe("subB");
+        ses3.unsubscribe("subC");
+        ses4.unsubscribe("subD");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+    }
+
+    public void testForceDurablePropagationDuplex() throws Exception {
+        duplex = true;
+        testForceDurablePropagation();
+    }
+
+    public void testForceDurablePropagationOneWay() throws Exception {
+        duplex = false;
+        testForceDurablePropagation();
+    }
+
+    protected void testForceDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createConsumer(dest);
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createConsumer(dest);
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientC.close();
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+    }
+
+    public void testDurablePropagationSyncDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationSync();
+    }
+
+    public void testDurablePropagationSyncOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationSync();
+    }
+
+    protected void testDurablePropagationSync() throws Exception {
+        // Setup broker networks
+        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+
+        NetworkConnector nc3 = null;
+        NetworkConnector nc4 = null;
+        if (!duplex) {
+            nc3 = bridgeBrokers("BrokerB", "BrokerA");
+            nc4 = bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        nc1.stop();
+        nc2.stop();
+
+        if (!duplex) {
+            nc3.stop();
+            nc4.stop();
+        }
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+        nc1.start();
+        nc2.start();
+        if (!duplex) {
+            nc3.start();
+            nc4.start();
+        }
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+    }
+
+    public void testDurablePropagationMultipleBridgesDifferentDestinations() throws Exception {
+        duplex = true;
+
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        //Duplicate the bridges with different included destinations - valid use case
+        NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC");
+        nc3.setName("nc3");
+        nc4.setName("nc4");
+        nc3.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true")));
+        nc4.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true")));
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+        ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        MessageConsumer clientCc = ses2.createDurableSubscriber(dest2, "subCc");
+        Thread.sleep(1000);
+
+        //make sure network durables are online
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+
+        clientA.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+
+        clientAa.close();
+        clientCc.close();
+        ses.unsubscribe("subAa");
+        ses2.unsubscribe("subCc");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0);
+    }
+
+    protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
+            final int count) throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == getNCDurableSubs(brokerService, dest).size();
+            }
+        }, 10000, 500));
+    }
+
+    protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
+            final ActiveMQTopic dest) throws Exception {
+        List<DurableTopicSubscription> subs = new ArrayList<>();
+        Destination d = brokerService.getDestination(dest);
+        org.apache.activemq.broker.region.Topic destination = null;
+        if (d instanceof DestinationFilter) {
+            destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
+        } else {
+            destination = (org.apache.activemq.broker.region.Topic) d;
+        }
+
+        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
+            if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
+                DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
+                if (sub != null) {
+                    subs.add(sub);
+                }
+            }
+        }
+
+        return subs;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        String options = new String("?persistent=false&useJmx=false");
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
+        createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
+        createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + options));
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) {
+        broker.setBrokerId(broker.getBrokerName());
+    }
+
+    protected Session createSession(String broker) throws Exception {
+        Connection con = createConnection(broker);
+        con.start();
+        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public static Test suite() {
+        return suite(DurableFiveBrokerNetworkBridgeTest.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
deleted file mode 100644
index ff09a1c..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.network;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.util.SubscriptionKey;
-import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-
-import com.google.common.collect.Lists;
-
-import junit.framework.Test;
-
-/**
- * Test to make sure durable subscriptions propagate properly throughout network bridges
- * and that conduit subscriptions work properly
- */
-public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
-
-    @Override
-    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
-        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
-        connector.setDynamicallyIncludedDestinations(
-                Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
-        connector.setDuplex(true);
-        connector.setDecreaseNetworkConsumerPriority(false);
-        connector.setConduitSubscriptions(true);
-        connector.setSyncDurableSubs(true);
-        connector.setNetworkTTL(-1);
-        return connector;
-    }
-
-    /**
-     * BrokerA -> BrokerB -> BrokerC
-     */
-    public void testDurablePropagation() throws Exception {
-        // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
-
-        // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-
-        sendMessages("BrokerC", dest, 1);
-        assertNotNull(clientA.receive(1000));
-        assertNotNull(clientB.receive(1000));
-
-        //bring online a consumer on the other side
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-        //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientB.close();
-        clientC.close();
-        ses.unsubscribe("subA");
-        ses.unsubscribe("subB");
-        ses2.unsubscribe("subC");
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-
-    }
-
-    public void testForceDurablePropagation() throws Exception {
-        // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        MessageConsumer clientA = ses.createConsumer(dest);
-
-        // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-
-        sendMessages("BrokerC", dest, 1);
-        assertNotNull(clientA.receive(1000));
-
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientC = ses2.createConsumer(dest);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientC.close();
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-    }
-
-    public void testDurablePropagationSync() throws Exception {
-        // Setup broker networks
-        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
-        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        nc1.stop();
-        nc2.stop();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-
-        nc1.start();
-        nc2.start();
-
-        //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientB.close();
-        clientC.close();
-    }
-
-
-    protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
-            final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getNCDurableSubs(brokerService, dest).size();
-            }
-        }, 10000, 500));
-    }
-
-    protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
-            final ActiveMQTopic dest) throws Exception {
-        List<DurableTopicSubscription> subs = new ArrayList<>();
-        Destination d = brokerService.getDestination(dest);
-        org.apache.activemq.broker.region.Topic destination = null;
-        if (d instanceof DestinationFilter) {
-            destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
-        } else {
-            destination = (org.apache.activemq.broker.region.Topic) d;
-        }
-
-        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
-            if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
-                DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
-                if (sub != null) {
-                    subs.add(sub);
-                }
-            }
-        }
-
-        return subs;
-    }
-
-    @Override
-    public void setUp() throws Exception {
-        super.setAutoFail(true);
-        super.setUp();
-        String options = new String("?persistent=false&useJmx=false");
-        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
-        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
-        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
-    }
-
-    @Override
-    protected void configureBroker(BrokerService broker) {
-        broker.setBrokerId(broker.getBrokerName());
-    }
-
-    protected Session createSession(String broker) throws Exception {
-        Connection con = createConnection(broker);
-        con.start();
-        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public static Test suite() {
-        return suite(DurableThreeBrokerNetworkBridgeTest.class);
-    }
-}