[activemq-artemis] branch master updated: ARTEMIS-2613: Add support for DivertBindings for federated addresses

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq-artemis] branch master updated: ARTEMIS-2613: Add support for DivertBindings for federated addresses

cshannon
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3966e47  ARTEMIS-2613: Add support for DivertBindings for federated addresses
     new d3d9ceb  This closes #2966
3966e47 is described below

commit 3966e4733877a72dcac7a0aa8451665699a705b9
Author: Christopher L. Shannon (cshannon) <[hidden email]>
AuthorDate: Thu Jan 30 08:53:32 2020 -0500

    ARTEMIS-2613: Add support for DivertBindings for federated addresses
   
    This will allow federated addresses to create remote consumers based on
    the existing of divert bindings and matching queue bindings
---
 .../FederationAddressPolicyConfiguration.java      |  16 +-
 .../deployers/impl/FileConfigurationParser.java    |   3 +
 .../artemis/core/server/ActiveMQServerLogger.java  |   5 +
 .../activemq/artemis/core/server/Divert.java       |   2 +
 .../federation/address/FederatedAddress.java       | 221 ++++++++++++++++-----
 .../artemis/core/server/impl/DivertImpl.java       |   5 +
 .../plugin/ActiveMQServerFederationPlugin.java     |   6 +
 .../resources/schema/artemis-configuration.xsd     |   1 +
 .../src/test/resources/artemis-configuration.xsd   |   1 +
 docs/user-manual/en/federation-address.md          |  15 ++
 .../federation/federated-address-divert/pom.xml    | 173 ++++++++++++++++
 .../federation/federated-address-divert/readme.md  |  18 ++
 .../jms/example/FederatedAddressDivertExample.java | 117 +++++++++++
 .../src/main/resources/activemq/server0/broker.xml | 111 +++++++++++
 .../src/main/resources/activemq/server1/broker.xml |  65 ++++++
 examples/features/federation/pom.xml               |   2 +
 .../federation/FederatedAddressTest.java           | 168 ++++++++++++++--
 17 files changed, 867 insertions(+), 62 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java
index 71188e6..f00fbd9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java
@@ -34,6 +34,7 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
    private Long autoDeleteMessageCount;
    private int maxHops;
    private String transformerRef;
+   private boolean enableDivertBindings;
 
    @Override
    public String getName() {
@@ -109,6 +110,15 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
       return this;
    }
 
+   public Boolean isEnableDivertBindings() {
+      return enableDivertBindings;
+   }
+
+   public FederationAddressPolicyConfiguration setEnableDivertBindings(Boolean enableDivertBindings) {
+      this.enableDivertBindings = enableDivertBindings;
+      return this;
+   }
+
    @Override
    public void encode(ActiveMQBuffer buffer) {
       Preconditions.checkArgument(name != null, "name can not be null");
@@ -118,9 +128,9 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
       buffer.writeNullableLong(autoDeleteMessageCount);
       buffer.writeInt(maxHops);
       buffer.writeNullableString(transformerRef);
-
       encodeMatchers(buffer, includes);
       encodeMatchers(buffer, excludes);
+      buffer.writeBoolean(enableDivertBindings);
    }
 
    @Override
@@ -131,12 +141,14 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
       autoDeleteMessageCount = buffer.readNullableLong();
       maxHops = buffer.readInt();
       transformerRef = buffer.readNullableString();
-
       includes = new HashSet<>();
       excludes = new HashSet<>();
       decodeMatchers(buffer, includes);
       decodeMatchers(buffer, excludes);
 
+      if (buffer.readableBytes() > 0) {
+         enableDivertBindings = buffer.readBoolean();
+      }
    }
 
    private void encodeMatchers(final ActiveMQBuffer buffer, final Set<Matcher> matchers) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index ce4e39d..0087f44 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2091,6 +2091,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          } else if (item.getNodeName().equals("transformer-ref")) {
             String transformerRef = item.getNodeValue();
             config.setTransformerRef(transformerRef);
+         } else if (item.getNodeName().equals("enable-divert-bindings")) {
+            boolean enableDivertBindings = Boolean.parseBoolean(item.getNodeValue());
+            config.setEnableDivertBindings(enableDivertBindings);
          }
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 84de5ed..4909c0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1659,6 +1659,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void federationPluginExecutionError(@Cause Throwable e, String pluginMethod);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222287, value = "Error looking up bindings for address {}.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void federationBindingsLookupError(@Cause Throwable e, SimpleString address);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java
index 5b2694c..f51f4b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java
@@ -31,4 +31,6 @@ public interface Divert extends Bindable {
    SimpleString getRoutingName();
 
    Transformer getTransformer();
+
+   SimpleString getForwardAddress();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
index 9a10cde..2a3d7e5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java
@@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.server.federation.address;
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -34,8 +36,9 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
+import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -43,11 +46,13 @@ import org.apache.activemq.artemis.core.server.federation.FederatedAbstract;
 import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
 import org.apache.activemq.artemis.core.server.federation.Federation;
 import org.apache.activemq.artemis.core.server.federation.FederationUpstream;
-import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.settings.impl.Match;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ByteUtil;
-import org.jboss.logging.Logger;
 
 /**
  * Federated Address, replicate messages from the remote brokers address to itself.
@@ -59,9 +64,8 @@ import org.jboss.logging.Logger;
  *
  *
  */
-public class FederatedAddress extends FederatedAbstract implements ActiveMQServerQueuePlugin, Serializable {
+public class FederatedAddress extends FederatedAbstract implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin, Serializable {
 
-   private static final Logger logger = Logger.getLogger(FederatedAddress.class);
    public static final String FEDERATED_QUEUE_PREFIX = "federated";
 
    public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops");
@@ -69,8 +73,8 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
    private final SimpleString filterString;
    private final Set<Matcher> includes;
    private final Set<Matcher> excludes;
-
    private final FederationAddressPolicyConfiguration config;
+   private final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new HashMap<>();
 
    public FederatedAddress(Federation federation, FederationAddressPolicyConfiguration config, ActiveMQServer server, FederationUpstream upstream) {
       super(federation, server, upstream);
@@ -102,25 +106,16 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
    }
 
    @Override
-   public void start() {
-      super.start();
-      server.getPostOffice()
-            .getAllBindings()
-            .values()
-            .stream()
-            .filter(b -> b instanceof QueueBinding)
-            .map(b -> ((QueueBinding) b).getQueue())
-            .forEach(this::conditionalCreateRemoteConsumer);
-   }
-
-   /**
-    * After a queue has been created
-    *
-    * @param queue The newly created queue
-    */
-   @Override
-   public synchronized void afterCreateQueue(Queue queue) {
-      conditionalCreateRemoteConsumer(queue);
+   public synchronized void start() {
+      if (!isStarted()) {
+         super.start();
+         server.getPostOffice()
+             .getAllBindings()
+             .values()
+             .stream()
+             .filter(b -> b instanceof QueueBinding || b instanceof DivertBinding)
+             .forEach(this::afterAddBinding);
+      }
    }
 
    private void conditionalCreateRemoteConsumer(Queue queue) {
@@ -141,6 +136,145 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
       createRemoteConsumer(queue);
    }
 
+   @Override
+   public void afterAddAddress(AddressInfo addressInfo, boolean reload) {
+      if (match(addressInfo)) {
+         try {
+            //Diverts can be added without the source address existing yet so
+            //if a new address is added we need to see if there are matching divert bindings
+            server.getPostOffice()
+               .getDirectBindings(addressInfo.getName())
+               .getBindings().stream().filter(binding -> binding instanceof DivertBinding)
+               .forEach(this::afterAddBinding);
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, addressInfo.getName());
+         }
+      }
+   }
+
+   @Override
+   public void afterAddBinding(Binding binding) {
+      if (binding instanceof QueueBinding) {
+         conditionalCreateRemoteConsumer(((QueueBinding) binding).getQueue());
+
+         if (config.isEnableDivertBindings()) {
+            synchronized (this) {
+               for (Map.Entry<DivertBinding, Set<SimpleString>> entry : matchingDiverts.entrySet()) {
+                  //for each divert check the new QueueBinding to see if the divert matches and is not already tracking
+                  if (!entry.getValue().contains(((QueueBinding) binding).getQueue().getName())) {
+                     //conditionalCreateRemoteConsumer will check if the queue is a target of the divert before adding
+                     conditionalCreateRemoteConsumer(entry.getKey(), entry.getValue(), (QueueBinding) binding);
+                  }
+               }
+            }
+         }
+      } else if (config.isEnableDivertBindings() && binding instanceof DivertBinding) {
+         final DivertBinding divertBinding = (DivertBinding) binding;
+         final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
+
+         synchronized (this) {
+            if (match(addressInfo) && matchingDiverts.get(divertBinding) == null) {
+               final Set<SimpleString> matchingQueues = new HashSet<>();
+               matchingDiverts.put(divertBinding, matchingQueues);
+
+               //find existing matching queue bindings for the divert to create consumers for
+               final SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress();
+               try {
+                  //create demand for each matching queue binding that isn't already tracked by the divert
+                  //conditionalCreateRemoteConsumer will check if the queue is a target of the divert before adding
+                  server.getPostOffice().getBindingsForAddress(forwardAddress).getBindings()
+                     .stream().filter(b -> b instanceof QueueBinding).map(b -> (QueueBinding) b)
+                     .forEach(queueBinding -> conditionalCreateRemoteConsumer(divertBinding, matchingQueues, queueBinding));
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, forwardAddress);
+               }
+            }
+         }
+      }
+   }
+
+   private void conditionalCreateRemoteConsumer(DivertBinding divertBinding, Set<SimpleString> matchingQueues, QueueBinding queueBinding) {
+      if (server.hasBrokerFederationPlugins()) {
+         final AtomicBoolean conditionalCreate = new AtomicBoolean(true);
+         try {
+            server.callBrokerFederationPlugins(plugin -> {
+               conditionalCreate.set(conditionalCreate.get() && plugin.federatedAddressConditionalCreateDivertConsumer(divertBinding, queueBinding));
+            });
+         } catch (ActiveMQException t) {
+            ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedAddressConditionalCreateDivertConsumer");
+            throw new IllegalStateException(t.getMessage(), t.getCause());
+         }
+         if (!conditionalCreate.get()) {
+            return;
+         }
+      }
+      createRemoteConsumer(divertBinding, matchingQueues, queueBinding);
+   }
+
+   private void createRemoteConsumer(DivertBinding divertBinding, final Set<SimpleString> matchingQueues, QueueBinding queueBinding)  {
+      final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress());
+
+      //If the divert address matches and if the new queueBinding matches the forwarding address of the divert
+      //then create a remote consumer if not already being tracked by the divert
+      if (match(addressInfo) && queueBinding.getAddress().equals(divertBinding.getDivert().getForwardAddress())
+         && matchingQueues.add(queueBinding.getQueue().getName())) {
+         FederatedConsumerKey key = getKey(addressInfo);
+         Transformer transformer = getTransformer(config.getTransformerRef());
+         Transformer addHop = FederatedAddress::addHop;
+         createRemoteConsumer(key, mergeTransformers(addHop, transformer), clientSession -> createRemoteQueue(clientSession, key));
+      }
+   }
+
+   @Override
+   public void beforeRemoveBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) {
+      final Binding binding = server.getPostOffice().getBinding(uniqueName);
+      if (binding instanceof QueueBinding) {
+         final Queue queue = ((QueueBinding) binding).getQueue();
+
+         //Remove any direct queue demand
+         removeRemoteConsumer(getKey(queue));
+
+         if (config.isEnableDivertBindings()) {
+            //See if there is any matching diverts that match this queue binding and remove demand now that
+            //the queue is going away
+            synchronized (this) {
+               matchingDiverts.entrySet().forEach(entry -> {
+                  if (entry.getKey().getDivert().getForwardAddress().equals(queue.getAddress())) {
+                     final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
+                     //check if the queue has been tracked by this divert and if so remove the consumer
+                     if (entry.getValue().remove(queue)) {
+                        removeRemoteConsumer(getKey(addressInfo));
+                     }
+                  }
+               });
+            }
+         }
+      } else if (config.isEnableDivertBindings() && binding instanceof DivertBinding) {
+         final DivertBinding divertBinding = (DivertBinding) binding;
+         final SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress();
+
+         //Check if we have added this divert binding as a matching binding
+         //If we have then we need to look for any still existing queue bindings that map to this divert
+         //and remove consumers if they haven't already been removed
+         synchronized (this) {
+            final Set<SimpleString> matchingQueues;
+            if ((matchingQueues = matchingDiverts.remove(binding)) != null) {
+               try {
+                  final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
+                  if (addressInfo != null) {
+                     //remove queue binding demand if tracked by the divert
+                     server.getPostOffice().getBindingsForAddress(forwardAddress)
+                        .getBindings().stream().filter(b -> b instanceof QueueBinding && matchingQueues.remove(((QueueBinding) b).getQueue().getName()))
+                        .forEach(queueBinding -> removeRemoteConsumer(getKey(addressInfo)));
+                  }
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, forwardAddress);
+               }
+            }
+         }
+      }
+   }
+
    public FederationAddressPolicyConfiguration getConfig() {
       return config;
    }
@@ -170,12 +304,20 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
    }
 
    private boolean match(Queue queue) {
+      return match(queue.getAddress(), queue.getRoutingType());
+   }
+
+   private boolean match(AddressInfo addressInfo) {
+      return addressInfo != null ? match(addressInfo.getName(), addressInfo.getRoutingType()) : false;
+   }
+
+   private boolean match(SimpleString address, RoutingType routingType) {
       //Currently only supporting Multicast currently.
-      if (RoutingType.ANYCAST.equals(queue.getRoutingType())) {
+      if (RoutingType.ANYCAST.equals(routingType)) {
          return false;
       }
       for (Matcher exclude : excludes) {
-         if (exclude.test(queue)) {
+         if (exclude.test(address.toString())) {
             return false;
          }
       }
@@ -183,7 +325,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
          return true;
       } else {
          for (Matcher include : includes) {
-            if (include.test(queue)) {
+            if (include.test(address.toString())) {
                return true;
             }
          }
@@ -208,24 +350,15 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
       }
    }
 
-   /**
-    * Before an address is removed
-    *
-    * @param queue The queue that will be removed
-    */
-   @Override
-   public synchronized void beforeDestroyQueue(Queue queue, final SecurityAuth session, boolean checkConsumerCount,
-      boolean removeConsumers, boolean autoDeleteAddress) {
-      FederatedConsumerKey key = getKey(queue);
-      removeRemoteConsumer(key);
-   }
-
    private FederatedConsumerKey getKey(Queue queue) {
       return new FederatedAddressConsumerKey(federation.getName(), upstream.getName(), queue.getAddress(), queue.getRoutingType(), queueNameFormat, filterString);
    }
 
-   public static class Matcher {
+   private FederatedConsumerKey getKey(AddressInfo address) {
+      return new FederatedAddressConsumerKey(federation.getName(), upstream.getName(), address.getName(), address.getRoutingType(), queueNameFormat, filterString);
+   }
 
+   public static class Matcher {
       Predicate<String> addressPredicate;
 
       Matcher(FederationAddressPolicyConfiguration.Matcher config, WildcardConfiguration wildcardConfiguration) {
@@ -234,10 +367,8 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
          }
       }
 
-      public boolean test(Queue queue) {
-         return addressPredicate == null || addressPredicate.test(queue.getAddress().toString());
+      public boolean test(String address) {
+         return addressPredicate == null || addressPredicate.test(address);
       }
-
    }
-
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 7e37adb..1bf6123 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -163,6 +163,11 @@ public class DivertImpl implements Divert {
       return transformer;
    }
 
+   @Override
+   public SimpleString getForwardAddress() {
+      return forwardAddress;
+   }
+
    /* (non-Javadoc)
     * @see java.lang.Object#toString()
     */
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java
index 8e27693..e64d264 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java
@@ -19,6 +19,8 @@ package org.apache.activemq.artemis.core.server.plugin;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
@@ -121,6 +123,10 @@ public interface ActiveMQServerFederationPlugin extends ActiveMQServerBasePlugin
       return true;
    }
 
+   default boolean federatedAddressConditionalCreateDivertConsumer(DivertBinding divertBinding, QueueBinding queueBinding) throws ActiveMQException {
+      return true;
+   }
+
    /**
     * Conditionally create a federated queue consumer for a federated queue. This allows custom
     * logic to be inserted to decide when to create federated queue consumers
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 7944094..1177f4b 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1824,6 +1824,7 @@
       <xsd:attribute name="auto-delete-message-count" type="xsd:long" use="optional" />
       <xsd:attribute name="max-hops" type="xsd:int" use="optional" />
       <xsd:attribute name="name" type="xsd:ID" use="required" />
+      <xsd:attribute name="enable-divert-bindings" type="xsd:boolean" use="optional" />
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 891df2d..b06c527 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -1696,6 +1696,7 @@
       <xsd:attribute name="auto-delete-message-count" type="xsd:long" use="optional" />
       <xsd:attribute name="max-hops" type="xsd:int" use="optional" />
       <xsd:attribute name="name" type="xsd:ID" use="required" />
+      <xsd:attribute name="enable-divert-bindings" type="xsd:boolean" use="optional" />
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 
diff --git a/docs/user-manual/en/federation-address.md b/docs/user-manual/en/federation-address.md
index 8a77cb9..68d1ba2 100644
--- a/docs/user-manual/en/federation-address.md
+++ b/docs/user-manual/en/federation-address.md
@@ -66,6 +66,18 @@ the tree can extend to any depth, and can be extended to without needing to re-c
 
 In this case messages published to the master address can be received by any consumer connected to any broker in the tree.
 
+### Divert Binding Support
+
+Divert binding support can be added as part of the address policy configuration. This will allow the federation to respond
+to divert bindings to create demand. For example, let's say there is one address called "test.federation.source" that is
+included as a match for the federated address and another address called "test.federation.target" that is not included. Normally
+when a queue is created on "test.federation.target" this would not cause a federated consumer to be created because the address
+is not part of the included matches. However, if we create a divert binding such that "test.federation.source" is the source address
+and "test.federation.target" is the forwarded address then demand will now be created. The source address still must be multicast
+but the target address can be multicast or anycast.
+
+An example use case for this might be a divert that redirects JMS topics (multicast addresses) to a JMS queue (anycast addresses) to
+allow for load balancing of the messages on a topic for legacy consumers not supporting JMS 2.0 and shared subscriptions.
 
 ## Configuring Address Federation
 
@@ -135,6 +147,9 @@ and the delay and message count params have been met. This is useful if you want
 
 - `transformer-ref`. The ref name for a transformer (see transformer config) that you may wish to configure to transform the message on federation transfer.
 
+- `enable-divert-bindings`. Setting to true will enable divert bindings to be listened for demand. If there is a divert binding with an address that matches the included
+addresses for the stream, any queue bindings that match the forward address of the divert will create demand. Default is false
+
 **note** `address-policy`'s and `queue-policy`'s are able to be defined in the same federation, and be linked to the same upstream.
 
 
diff --git a/examples/features/federation/federated-address-divert/pom.xml b/examples/features/federation/federated-address-divert/pom.xml
new file mode 100644
index 0000000..bfab93c
--- /dev/null
+++ b/examples/features/federation/federated-address-divert/pom.xml
@@ -0,0 +1,173 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq.examples.federation</groupId>
+      <artifactId>broker-federation</artifactId>
+      <version>2.12.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>federated-address-divert</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ Artemis Federated Address Divert Example</name>
+
+   <properties>
+      <activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-jms-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+   </dependencies>
+
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>artemis-maven-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>create0</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <instance>${basedir}/target/server0</instance>
+                     <configuration>${basedir}/target/classes/activemq/server0</configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>create1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <instance>${basedir}/target/server1</instance>
+                     <configuration>${basedir}/target/classes/activemq/server1</configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>start0</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <spawn>true</spawn>
+                     <location>${basedir}/target/server0</location>
+                     <testURI>tcp://localhost:61616</testURI>
+                     <args>
+                        <param>run</param>
+                     </args>
+                     <name>eu-west-1</name>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>start1</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <spawn>true</spawn>
+                     <location>${basedir}/target/server1</location>
+                     <testURI>tcp://localhost:61617</testURI>
+                     <args>
+                        <param>run</param>
+                     </args>
+                     <name>eu-east-1</name>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>runClient</id>
+                  <goals>
+                     <goal>runClient</goal>
+                  </goals>
+                  <configuration>
+                     <clientClass>org.apache.activemq.artemis.jms.example.FederatedAddressDivertExample</clientClass>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>stop0</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <location>${basedir}/target/server0</location>
+                     <args>
+                        <param>stop</param>
+                     </args>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>stop1</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <location>${basedir}/target/server1</location>
+                     <args>
+                        <param>stop</param>
+                     </args>
+                  </configuration>
+               </execution>
+            </executions>
+            <dependencies>
+               <dependency>
+                  <groupId>org.apache.activemq.examples.federation</groupId>
+                  <artifactId>federated-address-divert</artifactId>
+                  <version>${project.version}</version>
+               </dependency>
+            </dependencies>
+         </plugin>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-clean-plugin</artifactId>
+         </plugin>
+      </plugins>
+   </build>
+   <profiles>
+      <profile>
+         <id>release</id>
+         <build>
+            <plugins>
+               <plugin>
+                  <groupId>com.vladsch.flexmark</groupId>
+                  <artifactId>markdown-page-generator-plugin</artifactId>
+               </plugin>
+            </plugins>
+         </build>
+      </profile>
+   </profiles>
+</project>
diff --git a/examples/features/federation/federated-address-divert/readme.md b/examples/features/federation/federated-address-divert/readme.md
new file mode 100644
index 0000000..b30d43b
--- /dev/null
+++ b/examples/features/federation/federated-address-divert/readme.md
@@ -0,0 +1,18 @@
+# Federated Address Divert Example
+
+To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually.
+
+This example demonstrates a core multicast address deployed on two different brokers. The two brokers are configured to form a federated address mesh.
+
+In the example we name the brokers eu-west and eu-east.
+
+The following is then carried out:
+
+1. create a divert binding with a source address of exampleTopic and target address of divertExampleTopic on eu-west
+
+1. create a consumer on the topic divertExampleTopic on eu-west and create a producer on the topic exampleTopic on eu-east.
+
+2. send some messages via the producer on eu-east, and we verify the eu-west consumer receives the messages because of the divert binding demand
+
+
+For more information on ActiveMQ Artemis Federation please see the federation section of the user manual.
\ No newline at end of file
diff --git a/examples/features/federation/federated-address-divert/src/main/java/org/apache/activemq/artemis/jms/example/FederatedAddressDivertExample.java b/examples/features/federation/federated-address-divert/src/main/java/org/apache/activemq/artemis/jms/example/FederatedAddressDivertExample.java
new file mode 100644
index 0000000..62af0ac
--- /dev/null
+++ b/examples/features/federation/federated-address-divert/src/main/java/org/apache/activemq/artemis/jms/example/FederatedAddressDivertExample.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+
+/**
+ * A simple example that demonstrates multicast address replication between remote servers,
+ * using Address Federation feature and diverts.
+ */
+public class FederatedAddressDivertExample {
+
+   public static void main(final String[] args) throws Exception {
+      Connection connectionEUWest = null;
+
+      Connection connectionEUEast = null;
+
+
+      try {
+         // Step 1. Instantiate the Topic (multicast) for the producers
+         Topic topic = ActiveMQJMSClient.createTopic("exampleTopic");
+
+         //Create a topic for the consumers
+         Topic topic2 = ActiveMQJMSClient.createTopic("divertExampleTopic");
+
+         // Step 2. Instantiate connection towards server EU West
+         ConnectionFactory cfEUWest = new ActiveMQConnectionFactory("tcp://localhost:61616");
+
+         // Step 3. Instantiate connection towards server EU East
+         ConnectionFactory cfEUEast = new ActiveMQConnectionFactory("tcp://localhost:61617");
+
+
+         // Step 5. We create a JMS Connection connectionEUWest which is a connection to server EU West
+         connectionEUWest = cfEUWest.createConnection();
+
+         // Step 6. We create a JMS Connection connectionEUEast which is a connection to server EU East
+         connectionEUEast = cfEUEast.createConnection();
+
+         // Step 8. We create a JMS Session on server EU West
+         Session sessionEUWest = connectionEUWest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // Step 9. We create a JMS Session on server EU East
+         Session sessionEUEast = connectionEUEast.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         // Step 11. We start the connections to ensure delivery occurs on them
+         connectionEUWest.start();
+
+         connectionEUEast.start();
+
+         // Step 12. We create a JMS MessageProducer object on each server
+         MessageProducer producerEUEast = sessionEUEast.createProducer(topic);
+
+         // Step 13. We create JMS MessageConsumer objects on each server - Messages will be diverted to this topic
+         MessageConsumer consumerEUWest = sessionEUWest.createSharedDurableConsumer(topic2, "exampleSubscription");
+
+
+         // Step 14. Let a little time for everything to start and form.
+
+         Thread.sleep(5000);
+
+         // Step 13. We send some messages to server EU West
+         final int numMessages = 10;
+
+         // Step 15. Repeat same test one last time, this time sending on EU East
+
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage message = sessionEUEast.createTextMessage("This is text sent from EU East, message " + i);
+
+            producerEUEast.send(message);
+
+            System.out.println("EU East   :: Sent message: " + message.getText());
+         }
+
+         // Step 14. We now consume those messages on *all* servers .
+         // We note that every consumer, receives a message even so on seperate servers
+
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage messageEUWest = (TextMessage) consumerEUWest.receive(5000);
+
+            System.out.println("EU West   :: Got message: " + messageEUWest.getText());
+         }
+      } finally {
+         // Step 16. Be sure to close our resources!
+         if (connectionEUWest != null) {
+            connectionEUWest.stop();
+            connectionEUWest.close();
+         }
+
+         if (connectionEUEast != null) {
+            connectionEUEast.stop();
+            connectionEUEast.close();
+         }
+      }
+   }
+}
diff --git a/examples/features/federation/federated-address-divert/src/main/resources/activemq/server0/broker.xml b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server0/broker.xml
new file mode 100644
index 0000000..9a279c1
--- /dev/null
+++ b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server0/broker.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+   <core xmlns="urn:activemq:core">
+
+      <name>eu-west-1-master</name>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/largemessages</large-messages-directory>
+
+      <paging-directory>./data/paging</paging-directory>
+      <!-- Connectors -->
+
+      <connectors>
+         <connector name="netty-connector">tcp://localhost:61616</connector>
+         <connector name="eu-west-1-connector">tcp://localhost:61616</connector>
+         <connector name="eu-east-1-connector">tcp://localhost:61617</connector>
+      </connectors>
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
+      </acceptors>
+
+      <!-- Federation -->
+
+      <federations>
+         <federation name="eu-west-1-federation">
+            <upstream name="eu-east-1-upstream">
+               <circuit-breaker-timeout>1000</circuit-breaker-timeout>
+               <static-connectors>
+                  <connector-ref>eu-east-1-connector</connector-ref>
+               </static-connectors>
+               <policy ref="policySetA"/>
+            </upstream>
+
+            <policy-set name="policySetA">
+               <policy ref="address-federation" />
+            </policy-set>
+
+            <!-- Enable the use of divert bindings -->
+            <address-policy name="address-federation" enable-divert-bindings="true">
+               <include address-match="exampleTopic" />
+            </address-policy>
+         </federation>
+      </federations>
+
+      <!-- Divert configuration -->
+      <diverts>
+         <divert name="federation-divert">
+            <routing-name>federation-divert</routing-name>
+            <address>exampleTopic</address>
+            <forwarding-address>divertExampleTopic</forwarding-address>
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="exampleTopic">
+            <permission roles="guest" type="createDurableQueue"/>
+            <permission roles="guest" type="deleteDurableQueue"/>
+            <permission roles="guest" type="createNonDurableQueue"/>
+            <permission roles="guest" type="deleteNonDurableQueue"/>
+            <permission roles="guest" type="consume"/>
+            <permission roles="guest" type="send"/>
+         </security-setting>
+         <security-setting match="divertExampleTopic">
+            <permission roles="guest" type="createDurableQueue"/>
+            <permission roles="guest" type="deleteDurableQueue"/>
+            <permission roles="guest" type="createNonDurableQueue"/>
+            <permission roles="guest" type="deleteNonDurableQueue"/>
+            <permission roles="guest" type="consume"/>
+            <permission roles="guest" type="send"/>
+         </security-setting>
+      </security-settings>
+
+      <addresses>
+         <address name="exampleTopic">
+            <multicast />
+         </address>
+         <address name="divertExampleTopic">
+            <multicast>
+               <queue name="exampleSubscription"/>
+            </multicast>
+         </address>
+      </addresses>
+   </core>
+</configuration>
diff --git a/examples/features/federation/federated-address-divert/src/main/resources/activemq/server1/broker.xml b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server1/broker.xml
new file mode 100644
index 0000000..dda46db
--- /dev/null
+++ b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server1/broker.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+   <core xmlns="urn:activemq:core">
+
+      <name>eu-east-1-master</name>
+
+      <bindings-directory>target/server1/data/messaging/bindings</bindings-directory>
+
+      <journal-directory>target/server1/data/messaging/journal</journal-directory>
+
+      <large-messages-directory>target/server1/data/messaging/largemessages</large-messages-directory>
+
+      <paging-directory>target/server1/data/messaging/paging</paging-directory>
+
+      <!-- Connectors -->
+      <connectors>
+         <connector name="netty-connector">tcp://localhost:61617</connector>
+         <connector name="eu-west-1-connector">tcp://localhost:61616</connector>
+         <connector name="eu-east-1-connector">tcp://localhost:61617</connector>
+      </connectors>
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="netty-acceptor">tcp://localhost:61617</acceptor>
+      </acceptors>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="exampleTopic">
+            <permission roles="guest" type="createDurableQueue"/>
+            <permission roles="guest" type="deleteDurableQueue"/>
+            <permission roles="guest" type="createNonDurableQueue"/>
+            <permission roles="guest" type="deleteNonDurableQueue"/>
+            <permission roles="guest" type="consume"/>
+            <permission roles="guest" type="send"/>
+         </security-setting>
+      </security-settings>
+
+      <addresses>
+         <address name="exampleTopic">
+            <multicast />
+         </address>
+      </addresses>
+   </core>
+</configuration>
diff --git a/examples/features/federation/pom.xml b/examples/features/federation/pom.xml
index f13a087..327c66b 100644
--- a/examples/features/federation/pom.xml
+++ b/examples/features/federation/pom.xml
@@ -53,6 +53,7 @@ under the License.
             <module>federated-address</module>
             <module>federated-address-downstream</module>
             <module>federated-address-downstream-upstream</module>
+            <module>federated-address-divert</module>
          </modules>
       </profile>
       <profile>
@@ -64,6 +65,7 @@ under the License.
             <module>federated-address</module>
             <module>federated-address-downstream</module>
             <module>federated-address-downstream-upstream</module>
+            <module>federated-address-divert</module>
          </modules>
       </profile>
    </profiles>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java
index eea8925..bc87661 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java
@@ -22,11 +22,18 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -170,6 +177,139 @@ public class FederatedAddressTest extends FederatedTestBase {
       verifyTransformer(address);
    }
 
+   /**
+    * Test diverts for downstream configurations
+    */
+   //Test creating address first followed by divert
+   //Test creating divert before consumer
+   @Test
+   public void testDownstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception {
+      testFederatedAddressDivert(true,true, true);
+   }
+
+   //Test creating divert first followed by address
+   //Test creating divert before consumer
+   @Test
+   public void testDownstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception {
+      testFederatedAddressDivert(true,false, true);
+   }
+
+   //Test creating address first followed by divert
+   //Test creating consumer before divert
+   @Test
+   public void testDownstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception {
+      testFederatedAddressDivert(true,true, false);
+   }
+
+   //Test creating divert first followed by address
+   //Test creating consumer before divert
+   @Test
+   public void testDownstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception {
+      testFederatedAddressDivert(true,false, false);
+   }
+
+   /**
+    * Test diverts for upstream configurations
+    */
+   //Test creating address first followed by divert
+   //Test creating divert before consumer
+   @Test
+   public void testUpstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception {
+      testFederatedAddressDivert(false,true, true);
+   }
+
+   //Test creating divert first followed by address
+   //Test creating divert before consumer
+   @Test
+   public void testUpstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception {
+      testFederatedAddressDivert(false,false, true);
+   }
+
+   //Test creating address first followed by divert
+   //Test creating consumer before divert
+   @Test
+   public void testUpstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception {
+      testFederatedAddressDivert(false,true, false);
+   }
+
+   //Test creating divert first followed by address
+   //Test creating consumer before divert
+   @Test
+   public void testUpstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception {
+      testFederatedAddressDivert(false,false, false);
+   }
+
+   protected void testFederatedAddressDivert(boolean downstream, boolean addressFirst, boolean divertBeforeConsumer) throws Exception {
+      String address = getName();
+      String address2 = "fedOneWayDivertTest";
+
+      if (addressFirst) {
+         getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST));
+      }
+
+      final FederationConfiguration federationConfiguration;
+      final int deployServer;
+      if (downstream) {
+         federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration(
+            "server0", address, getServer(1).getConfiguration().getTransportConfigurations("server1")[0]);
+         deployServer = 1;
+      } else {
+         federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
+         deployServer = 0;
+      }
+
+      FederationAddressPolicyConfiguration policy = (FederationAddressPolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("AddressPolicy" + address);
+      //enable listening for divert bindings
+      policy.setEnableDivertBindings(true);
+      getServer(deployServer).getConfiguration().getFederationConfigurations().add(federationConfiguration);
+      getServer(deployServer).getFederationManager().deploy();
+
+      ConnectionFactory cf1 = getCF(1);
+      ConnectionFactory cf0 = getCF(0);
+      try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
+         connection1.start();
+         connection0.start();
+
+         Session session1 = connection1.createSession();
+         Topic topic1 = session1.createTopic(address);
+         MessageProducer producer1 = session1.createProducer(topic1);
+
+         if (divertBeforeConsumer) {
+            getServer(0).deployDivert(new DivertConfiguration().setName(address + ":" + address2)
+                   .setAddress(address).setExclusive(true).setForwardingAddress(address2)
+                   .setRoutingType(ComponentConfigurationRoutingType.ANYCAST));
+         }
+
+         Session session0 = connection0.createSession();
+         Queue queue0 = session0.createQueue(address2);
+         MessageConsumer consumer0 = session0.createConsumer(queue0);
+
+         if (!addressFirst) {
+            getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST));
+         }
+
+         if (!divertBeforeConsumer) {
+            getServer(0).deployDivert(new DivertConfiguration().setName(address + ":" + address2)
+                                         .setAddress(address).setExclusive(true).setForwardingAddress(address2)
+                                         .setRoutingType(ComponentConfigurationRoutingType.ANYCAST));
+         }
+
+         assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1,
+                                 1000, 100));
+         final QueueBinding remoteQueueBinding = (QueueBinding) getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address))
+            .getBindings().iterator().next();
+         assertEquals(1, remoteQueueBinding.getQueue().getConsumerCount());
+
+         producer1.send(session1.createTextMessage("hello"));
+         assertNotNull(consumer0.receive(1000));
+
+         //Test consumer is cleaned up after divert destroyed
+         getServer(0).destroyDivert(SimpleString.toSimpleString(address + ":" + address2));
+        // getServer(0).destroyQueue(SimpleString.toSimpleString(address2));
+         assertTrue(Wait.waitFor(() -> remoteQueueBinding.getQueue().getConsumerCount() == 0, 2000, 100));
+      }
+   }
+
    private void testFederatedAddressReplication(String address) throws Exception {
 
       ConnectionFactory cf1 = getCF(1);
@@ -188,37 +328,38 @@ public class FederatedAddressTest extends FederatedTestBase {
          Topic topic0 = session0.createTopic(address);
          MessageConsumer consumer0 = session0.createConsumer(topic0);
 
-         assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1));
+         assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(
+            SimpleString.toSimpleString(address)).getBindings().size() == 1, 2000, 100));
 
          producer.send(session1.createTextMessage("hello"));
 
-         assertNotNull(consumer0.receive(10000));
+         assertNotNull(consumer0.receive(1000));
 
 
          producer.send(session1.createTextMessage("hello"));
 
-         assertNotNull(consumer0.receive(10000));
+         assertNotNull(consumer0.receive(1000));
 
          MessageConsumer consumer1 = session1.createConsumer(topic1);
 
          producer.send(session1.createTextMessage("hello"));
 
-         assertNotNull(consumer1.receive(10000));
-         assertNotNull(consumer0.receive(10000));
+         assertNotNull(consumer1.receive(1000));
+         assertNotNull(consumer0.receive(1000));
          consumer1.close();
 
          //Groups
          producer.send(session1.createTextMessage("hello"));
-         assertNotNull(consumer0.receive(10000));
+         assertNotNull(consumer0.receive(1000));
 
          producer.send(createTextMessage(session1, "groupA"));
 
-         assertNotNull(consumer0.receive(10000));
+         assertNotNull(consumer0.receive(1000));
          consumer1 = session1.createConsumer(topic1);
 
          producer.send(createTextMessage(session1, "groupA"));
-         assertNotNull(consumer1.receive(10000));
-         assertNotNull(consumer0.receive(10000));
+         assertNotNull(consumer1.receive(1000));
+         assertNotNull(consumer0.receive(1000));
 
       }
 
@@ -246,20 +387,17 @@ public class FederatedAddressTest extends FederatedTestBase {
 
 
          producer.send(session1.createTextMessage("hello"));
-
          assertNull(consumer0.receive(100));
 
          FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
          getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
          getServer(0).getFederationManager().deploy();
 
-         Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1);
-
+         Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(
+            SimpleString.toSimpleString(address)).getBindings().size() == 1, 2000, 100);
 
          producer.send(session1.createTextMessage("hello"));
-
-         assertNotNull(consumer0.receive(10000));
-
+         assertNotNull(consumer0.receive(1000));
       }
    }