[1/2] activemq-artemis git commit: This closes #1021

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[1/2] activemq-artemis git commit: This closes #1021

martyntaylor
Repository: activemq-artemis
Updated Branches:
  refs/heads/master f660783df -> e088c2fa2


This closes #1021


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e088c2fa
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e088c2fa
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e088c2fa

Branch: refs/heads/master
Commit: e088c2fa27bcdc21e0ba47c373e43c7eae24e501
Parents: f660783 21b64b3
Author: Martyn Taylor <[hidden email]>
Authored: Thu Feb 16 17:37:50 2017 +0000
Committer: Martyn Taylor <[hidden email]>
Committed: Thu Feb 16 17:37:50 2017 +0000

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java |  12 ++
 .../core/protocol/mqtt/MQTTProtocolHandler.java |   2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java  |   2 +-
 .../protocol/mqtt/MQTTRetainMessageManager.java |   4 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java |  16 ++-
 .../core/protocol/mqtt/MQTTSessionState.java    |   5 +-
 .../protocol/mqtt/MQTTSubscriptionManager.java  |   9 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  48 +++----
 .../core/protocol/openwire/amq/AMQConsumer.java |   5 +-
 .../core/protocol/openwire/amq/AMQSession.java  |   9 +-
 .../protocol/openwire/util/OpenWireUtil.java    |  21 +--
 .../core/config/WildcardConfiguration.java      |  17 ++-
 .../core/postoffice/impl/AddressImpl.java       |   6 +-
 .../postoffice/impl/WildcardAddressManager.java |  11 --
 .../mqtt/imported/MQTTOpenwireTest.java         | 141 +++++++++++++++++++
 .../integration/mqtt/imported/MQTTTest.java     |   2 -
 .../mqtt/imported/MQTTTestSupport.java          |  19 ++-
 .../integration/openwire/OpenWireUtilTest.java  |   9 +-
 18 files changed, 257 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


Reply | Threaded
Open this post in threaded view
|

[2/2] activemq-artemis git commit: https://issues.apache.org/jira/browse/ARTEMIS-815 - support wildcard address configuration in mqtt layer

martyntaylor
https://issues.apache.org/jira/browse/ARTEMIS-815 - support wildcard address configuration in mqtt layer

https://issues.apache.org/jira/browse/ARTEMIS-815 - support wildcard address configuration in mqtt layer - remove old swap method

https://issues.apache.org/jira/browse/ARTEMIS-815 - added tests for mqtt-openwire integration and fixed openwire layer

https://issues.apache.org/jira/browse/ARTEMIS-815 - remove unused imports


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/21b64b3e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/21b64b3e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/21b64b3e

Branch: refs/heads/master
Commit: 21b64b3e4f4fc6dbeaa30ce610ecefa15110100a
Parents: f660783
Author: Dejan Bosanac <[hidden email]>
Authored: Tue Jan 17 15:44:28 2017 +0100
Committer: Martyn Taylor <[hidden email]>
Committed: Thu Feb 16 17:37:50 2017 +0000

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java |  12 ++
 .../core/protocol/mqtt/MQTTProtocolHandler.java |   2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java  |   2 +-
 .../protocol/mqtt/MQTTRetainMessageManager.java |   4 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java |  16 ++-
 .../core/protocol/mqtt/MQTTSessionState.java    |   5 +-
 .../protocol/mqtt/MQTTSubscriptionManager.java  |   9 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  48 +++----
 .../core/protocol/openwire/amq/AMQConsumer.java |   5 +-
 .../core/protocol/openwire/amq/AMQSession.java  |   9 +-
 .../protocol/openwire/util/OpenWireUtil.java    |  21 +--
 .../core/config/WildcardConfiguration.java      |  17 ++-
 .../core/postoffice/impl/AddressImpl.java       |   6 +-
 .../postoffice/impl/WildcardAddressManager.java |  11 --
 .../mqtt/imported/MQTTOpenwireTest.java         | 141 +++++++++++++++++++
 .../integration/mqtt/imported/MQTTTest.java     |   2 -
 .../mqtt/imported/MQTTTestSupport.java          |  19 ++-
 .../integration/openwire/OpenWireUtilTest.java  |   9 +-
 18 files changed, 257 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index decd189..b7f70c6 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -98,6 +98,18 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       this.data = data;
    }
 
+   public SimpleString(final char c) {
+      data = new byte[2];
+
+      byte low = (byte) (c & 0xFF); // low byte
+
+      data[0] = low;
+
+      byte high = (byte) (c >> 8 & 0xFF); // high byte
+
+      data[1] = high;
+   }
+
    // CharSequence implementation
    // ---------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index cfa944a..b3587a3 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -78,7 +78,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
       this.connectionEntry = entry;
       this.connection = connection;
-      this.session = new MQTTSession(this, connection, protocolManager);
+      this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration());
    }
 
    void stop(boolean error) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 2e5a1e9..26886c6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -219,7 +219,7 @@ public class MQTTPublishManager {
    }
 
    private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
-      String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString());
+      String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
 
       ByteBuf payload;
       switch (message.getType()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 008bcd8..27423d8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -43,7 +43,7 @@ public class MQTTRetainMessageManager {
     * the retained queue and the previous retain message consumed to remove it from the queue.
     */
    void handleRetainedMessage(ServerMessage message, String address, boolean reset) throws Exception {
-      SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
+      SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
 
       Queue queue = session.getServer().locateQueue(retainAddress);
       if (queue == null) {
@@ -70,7 +70,7 @@ public class MQTTRetainMessageManager {
       // Queue to add the retained messages to
 
       // The address filter that matches all retained message queues.
-      String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
+      String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
       BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
 
       // Iterate over all matching retain queues and add the head message to the original queue.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index cf0b4e6..c96beba 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -55,13 +56,18 @@ public class MQTTSession {
 
    private MQTTProtocolManager protocolManager;
 
+
    private boolean isClean;
 
+   private WildcardConfiguration wildcardConfiguration;
+
    public MQTTSession(MQTTProtocolHandler protocolHandler,
                       MQTTConnection connection,
-                      MQTTProtocolManager protocolManager) throws Exception {
+                      MQTTProtocolManager protocolManager,
+                      WildcardConfiguration wildcardConfiguration) throws Exception {
       this.protocolHandler = protocolHandler;
       this.protocolManager = protocolManager;
+      this.wildcardConfiguration = wildcardConfiguration;
 
       this.connection = connection;
 
@@ -181,4 +187,12 @@ public class MQTTSession {
       mqttPublishManager.clean();
       state.clear();
    }
+
+   public WildcardConfiguration getWildcardConfiguration() {
+      return wildcardConfiguration;
+   }
+
+   public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) {
+      this.wildcardConfiguration = wildcardConfiguration;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 9e18bc5..9458f8b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public class MQTTSessionState {
@@ -98,9 +99,9 @@ public class MQTTSessionState {
       return subscriptions.values();
    }
 
-   boolean addSubscription(MqttTopicSubscription subscription) {
+   boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration) {
       synchronized (subscriptions) {
-         addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()), new ConcurrentHashMap<Long, Integer>());
+         addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<Long, Integer>());
 
          MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
          if (existingSubscription != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index e012a26..c9e7a94 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -68,7 +68,7 @@ public class MQTTSubscriptionManager {
 
    synchronized void start() throws Exception {
       for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
-         String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName());
+         String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), session.getWildcardConfiguration());
          Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
          createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
       }
@@ -164,9 +164,9 @@ public class MQTTSubscriptionManager {
       int qos = subscription.qualityOfService().value();
       String topic = subscription.topicName();
 
-      String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
+      String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
 
-      session.getSessionState().addSubscription(subscription);
+      session.getSessionState().addSubscription(subscription, session.getWildcardConfiguration());
 
       Queue q = createQueueForSubscription(coreAddress, qos);
 
@@ -186,7 +186,8 @@ public class MQTTSubscriptionManager {
 
    // FIXME: Do we need this synchronzied?
    private synchronized void removeSubscription(String address) throws Exception {
-      String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
+      String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address, session.getWildcardConfiguration());
+
       SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
       session.getSessionState().removeSubscription(address);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 4819006..7bc6b84 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -27,6 +27,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 
@@ -65,44 +66,31 @@ public class MQTTUtil {
 
    public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
 
-   public static String convertMQTTAddressFilterToCore(String filter) {
-      return swapMQTTAndCoreWildCards(filter);
+   public static String convertMQTTAddressFilterToCore(String filter, WildcardConfiguration wildcardConfiguration) {
+      return MQTT_WILDCARD.convert(filter, wildcardConfiguration);
    }
 
+   public static class MQTTWildcardConfiguration extends WildcardConfiguration {
+      public MQTTWildcardConfiguration() {
+         setDelimiter('/');
+         setSingleWord('+');
+         setAnyWords('#');
+      }
+   }
+
+   public static final WildcardConfiguration MQTT_WILDCARD = new MQTTWildcardConfiguration();
+
    private static final MQTTLogger logger = MQTTLogger.LOGGER;
 
-   public static String convertCoreAddressFilterToMQTT(String filter) {
+   public static String convertCoreAddressFilterToMQTT(String filter, WildcardConfiguration wildcardConfiguration) {
       if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
          filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
       }
-      return swapMQTTAndCoreWildCards(filter);
+      return wildcardConfiguration.convert(filter, MQTT_WILDCARD);
    }
 
-   public static String convertMQTTAddressFilterToCoreRetain(String filter) {
-      return MQTT_RETAIN_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
-   }
-
-   public static String swapMQTTAndCoreWildCards(String filter) {
-      char[] topicFilter = filter.toCharArray();
-      for (int i = 0; i < topicFilter.length; i++) {
-         switch (topicFilter[i]) {
-            case '/':
-               topicFilter[i] = '.';
-               break;
-            case '.':
-               topicFilter[i] = '/';
-               break;
-            case '*':
-               topicFilter[i] = '+';
-               break;
-            case '+':
-               topicFilter[i] = '*';
-               break;
-            default:
-               break;
-         }
-      }
-      return String.valueOf(topicFilter);
+   public static String convertMQTTAddressFilterToCoreRetain(String filter, WildcardConfiguration wildcardConfiguration) {
+      return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration);
    }
 
    private static ServerMessage createServerMessage(MQTTSession session,
@@ -124,7 +112,7 @@ public class MQTTUtil {
                                                               boolean retain,
                                                               int qos,
                                                               ByteBuf payload) {
-      String coreAddress = convertMQTTAddressFilterToCore(topic);
+      String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
       ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
 
       // FIXME does this involve a copy?

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index ed2783f..77a1a4a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -81,7 +80,7 @@ public class AMQConsumer {
 
       SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
 
-      String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName());
+      String physicalName = session.convertWildcard(openwireDestination.getPhysicalName());
 
       SimpleString address;
 
@@ -97,7 +96,7 @@ public class AMQConsumer {
          serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
       } else {
-         SimpleString queueName = new SimpleString(OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName()));
+         SimpleString queueName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
          try {
             session.getCoreServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
          } catch (ActiveMQQueueExistsException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 5e4304f..7cdd070 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -29,7 +29,6 @@ import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
@@ -57,6 +56,8 @@ import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 import org.jboss.logging.Logger;
 
+import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
+
 public class AMQSession implements SessionCallback {
    private final Logger logger = Logger.getLogger(AMQSession.class);
 
@@ -152,7 +153,7 @@ public class AMQSession implements SessionCallback {
 
       for (ActiveMQDestination openWireDest : dests) {
          if (openWireDest.isQueue()) {
-            SimpleString queueName = new SimpleString(OpenWireUtil.convertWildcard(openWireDest.getPhysicalName()));
+            SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
 
             if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
                throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
@@ -405,6 +406,10 @@ public class AMQSession implements SessionCallback {
       }
    }
 
+   public String convertWildcard(String physicalName) {
+      return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration());
+   }
+
    public ServerSession getCoreSession() {
       return this.coreSession;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 04bd6a3..5355c63 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -29,6 +30,16 @@ import org.apache.activemq.util.ByteSequence;
 
 public class OpenWireUtil {
 
+   public static class OpenWireWildcardConfiguration extends WildcardConfiguration {
+      public OpenWireWildcardConfiguration() {
+         setDelimiter('.');
+         setSingleWord('*');
+         setAnyWords('>');
+      }
+   }
+
+   public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration();
+
    public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
       ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
 
@@ -52,16 +63,6 @@ public class OpenWireUtil {
       }
    }
 
-   /*
-    *This util converts amq wildcards to compatible core wildcards
-    *The conversion is like this:
-    *AMQ * wildcard --> Core * wildcard (no conversion)
-    *AMQ > wildcard --> Core # wildcard
-    */
-   public static String convertWildcard(String physicalName) {
-      return physicalName.replaceAll("(\\.>)+", ".#");
-   }
-
    public static XidImpl toXID(TransactionId xaXid) {
       return toXID((XATransactionId) xaXid);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
index 10c9cf2..bd9046b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
@@ -57,8 +57,17 @@ public class WildcardConfiguration implements Serializable {
       return result;
    }
 
-   public boolean isEnabled() {
+   @Override
+   public String toString() {
+      return "WildcardConfiguration{" +
+              "anyWords=" + anyWords +
+              ", enabled=" + enabled +
+              ", singleWord=" + singleWord +
+              ", delimiter=" + delimiter +
+              '}';
+   }
 
+   public boolean isEnabled() {
       return enabled;
    }
 
@@ -90,4 +99,10 @@ public class WildcardConfiguration implements Serializable {
       this.singleWord = singleWord;
    }
 
+   public String convert(String filter, WildcardConfiguration to) {
+      return filter.replace(getDelimiter(), to.getDelimiter())
+              .replace(getSingleWord(), to.getSingleWord())
+              .replace(getAnyWords(), to.getAnyWords());
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java
index 107ce61..ea78e4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java
@@ -93,15 +93,15 @@ public class AddressImpl implements Address {
       for (; matchPos < add.getAddressParts().length; ) {
          if (pos >= addressParts.length) {
             // test for # as last address part
-            return pos + 1 == add.getAddressParts().length && add.getAddressParts()[pos].equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING);
+            return pos + 1 == add.getAddressParts().length && add.getAddressParts()[pos].equals(new SimpleString(wildcardConfiguration.getAnyWords()));
          }
          SimpleString curr = addressParts[pos];
          SimpleString next = addressParts.length > pos + 1 ? addressParts[pos + 1] : null;
          SimpleString currMatch = add.getAddressParts()[matchPos];
-         if (currMatch.equals(WildcardAddressManager.SINGLE_WORD_SIMPLESTRING)) {
+         if (currMatch.equals(new SimpleString(wildcardConfiguration.getSingleWord()))) {
             pos++;
             matchPos++;
-         } else if (currMatch.equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING)) {
+         } else if (currMatch.equals(new SimpleString(wildcardConfiguration.getAnyWords()))) {
             if (matchPos == addressParts.length - 1) {
                pos++;
                matchPos++;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
index 5ca1b02..516cf37 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java
@@ -34,16 +34,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  */
 public class WildcardAddressManager extends SimpleAddressManager {
 
-   static final char SINGLE_WORD = '*';
-
-   static final char ANY_WORDS = '#';
-
-   static final char DELIM = '.';
-
-   static final SimpleString SINGLE_WORD_SIMPLESTRING = new SimpleString("*");
-
-   static final SimpleString ANY_WORDS_SIMPLESTRING = new SimpleString("#");
-
    /**
     * These are all the addresses, we use this so we can link back from the actual address to its linked wilcard addresses
     * or vice versa
@@ -175,7 +165,6 @@ public class WildcardAddressManager extends SimpleAddressManager {
             if (actualAddress.matches(destAdd)) {
                destAdd.addLinkedAddress(actualAddress);
                actualAddress.addLinkedAddress(destAdd);
-
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java
new file mode 100644
index 0000000..9d3f8fb
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTOpenwireTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Test;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class MQTTOpenwireTest extends MQTTTestSupport {
+
+   protected static final int NUM_MESSAGES = 1;
+
+   @Override
+   public void configureBroker() throws Exception {
+      super.configureBroker();
+      WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
+      wildcardConfiguration.setDelimiter('.');
+      wildcardConfiguration.setSingleWord('*');
+      wildcardConfiguration.setAnyWords('>');
+      server.getConfiguration().setWildCardConfiguration(wildcardConfiguration);
+   }
+
+   @Override
+   public void createJMSConnection() throws Exception {
+      cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
+   }
+
+   @Test
+   public void testWildcards() throws Exception {
+      doTestSendJMSReceiveMQTT("foo.bar", "foo/+");
+      doTestSendJMSReceiveMQTT("foo.bar", "foo/#");
+      doTestSendJMSReceiveMQTT("foo.bar.har", "foo/#");
+      doTestSendJMSReceiveMQTT("foo.bar.har", "foo/+/+");
+      doTestSendMQTTReceiveJMS("foo/bah", "foo.*");
+      doTestSendMQTTReceiveJMS("foo/bah", "foo.>");
+      doTestSendMQTTReceiveJMS("foo/bah/hah", "foo.*.*");
+      doTestSendMQTTReceiveJMS("foo/bah/har", "foo.>");
+   }
+
+   public void doTestSendMQTTReceiveJMS(String mqttTopic, String jmsDestination) throws Exception {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+
+      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
+
+      try {
+         // MUST set to true to receive retained messages
+         activeMQConnection.setUseRetroactiveConsumer(true);
+         activeMQConnection.start();
+         Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Topic jmsTopic = s.createTopic(jmsDestination);
+         MessageConsumer consumer = s.createConsumer(jmsTopic);
+
+         // send retained message
+         final String RETAINED = "RETAINED";
+         provider.publish(mqttTopic, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+         // check whether we received retained message on JMS subscribe
+         ActiveMQMessage message = (ActiveMQMessage) consumer.receive(2000);
+         assertNotNull("Should get retained message " + mqttTopic + "->" + jmsDestination, message);
+         ByteSequence bs = message.getContent();
+         assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+
+         for (int i = 0; i < 1; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish(mqttTopic, payload.getBytes(), AT_LEAST_ONCE);
+            message = (ActiveMQMessage) consumer.receive(1000);
+            assertNotNull("Should get a message " + mqttTopic + "->" + jmsDestination, message);
+            bs = message.getContent();
+            assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+         }
+      } finally {
+         activeMQConnection.close();
+         provider.disconnect();
+      }
+   }
+
+   public void doTestSendJMSReceiveMQTT(String jmsDestination, String mqttTopic) throws Exception {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+
+      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
+      try {
+         activeMQConnection.setUseRetroactiveConsumer(true);
+         activeMQConnection.start();
+         Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Topic jmsTopic = s.createTopic(jmsDestination);
+         MessageProducer producer = s.createProducer(jmsTopic);
+
+         final String RETAINED = "RETAINED";
+         provider.subscribe(mqttTopic, AT_MOST_ONCE);
+
+         // send retained message from JMS
+         TextMessage sendMessage = s.createTextMessage(RETAINED);
+         // mark the message to be retained
+         sendMessage.setBooleanProperty("ActiveMQ.Retain", true);
+         // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
+         sendMessage.setIntProperty("ActiveMQ.MQTT.QoS", 0);
+         producer.send(sendMessage);
+
+         byte[] message = provider.receive(2000);
+         assertNotNull("Should get retained message " + jmsDestination + "->" + mqttTopic, message);
+         assertEquals(RETAINED, new String(message));
+
+         for (int i = 0; i < 1; i++) {
+            String payload = "This is Test Message: " + i;
+            sendMessage = s.createTextMessage(payload);
+            producer.send(sendMessage);
+            message = provider.receive(1000);
+            assertNotNull("Should get a message " + jmsDestination + "->" + mqttTopic, message);
+
+            assertEquals(payload, new String(message));
+         }
+      } finally {
+         activeMQConnection.close();
+         provider.disconnect();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index d359f2e..a26a046 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -67,8 +67,6 @@ public class MQTTTest extends MQTTTestSupport {
 
    private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
 
-   private static final int NUM_MESSAGES = 250;
-
    @Override
    @Before
    public void setUp() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index b578f97..a45f06d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.artemis.tests.integration.mqtt.imported;
 
+import javax.jms.ConnectionFactory;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
@@ -58,18 +59,20 @@ import static java.util.Collections.singletonList;
 
 public class MQTTTestSupport extends ActiveMQTestBase {
 
-   private ActiveMQServer server;
+   protected ActiveMQServer server;
 
    private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
 
    protected int port = 1883;
-   protected ActiveMQConnectionFactory cf;
+   protected ConnectionFactory cf;
    protected LinkedList<Throwable> exceptions = new LinkedList<>();
    protected boolean persistent;
    protected String protocolConfig;
    protected String protocolScheme;
    protected boolean useSSL;
 
+   protected static final int NUM_MESSAGES = 250;
+
    public static final int AT_MOST_ONCE = 0;
    public static final int AT_LEAST_ONCE = 1;
    public static final int EXACTLY_ONCE = 2;
@@ -80,7 +83,6 @@ public class MQTTTestSupport extends ActiveMQTestBase {
    public MQTTTestSupport() {
       this.protocolScheme = "mqtt";
       this.useSSL = false;
-      cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY));
    }
 
    public File basedir() throws IOException {
@@ -110,6 +112,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
 
       exceptions.clear();
       startBroker();
+      createJMSConnection();
    }
 
    @Override
@@ -125,7 +128,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       super.tearDown();
    }
 
-   public void startBroker() throws Exception {
+   public void configureBroker() throws Exception {
       // TODO Add SSL
       super.setUp();
       server = createServerForMQTT();
@@ -137,10 +140,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       addressSettings.setAutoCreateAddresses(true);
 
       server.getAddressSettingsRepository().addMatch("#", addressSettings);
+   }
+
+   public void startBroker() throws Exception {
+      configureBroker();
       server.start();
       server.waitForActivation(10, TimeUnit.SECONDS);
    }
 
+   public void createJMSConnection() throws Exception {
+      cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY));
+   }
+
    private ActiveMQServer createServerForMQTT() throws Exception {
       Configuration defaultConfig = createDefaultConfig(true).setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName())).setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
       AddressSettings addressSettings = new AddressSettings();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21b64b3e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
index 4f2696d..5c92937 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.openwire;
 
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.junit.Test;
 
@@ -26,15 +27,15 @@ public class OpenWireUtilTest {
    @Test
    public void testWildcardConversion() throws Exception {
       String amqTarget = "TEST.ONE.>";
-      String coreTarget = OpenWireUtil.convertWildcard(amqTarget);
+      String coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration());
       assertEquals("TEST.ONE.#", coreTarget);
 
       amqTarget = "TEST.*.ONE";
-      coreTarget = OpenWireUtil.convertWildcard(amqTarget);
+      coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration());
       assertEquals("TEST.*.ONE", coreTarget);
 
       amqTarget = "a.*.>.>";
-      coreTarget = OpenWireUtil.convertWildcard(amqTarget);
-      assertEquals("a.*.#", coreTarget);
+      coreTarget = OpenWireUtil.OPENWIRE_WILDCARD.convert(amqTarget, new WildcardConfiguration());
+      assertEquals("a.*.#.#", coreTarget);
    }
 }