activemq-cli-tools git commit: AMQCLI-4, AMQCLI-5 Adding MetadataExporter abstraction

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

activemq-cli-tools git commit: AMQCLI-4, AMQCLI-5 Adding MetadataExporter abstraction

cshannon
Repository: activemq-cli-tools
Updated Branches:
  refs/heads/master 76ca845ff -> b8d33cdde


AMQCLI-4, AMQCLI-5 Adding MetadataExporter abstraction

this will allow a pluggable implementation to export metadata


Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/b8d33cdd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/b8d33cdd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/b8d33cdd

Branch: refs/heads/master
Commit: b8d33cddecd5fd9a5cb2eee8550c3aa2ba704a37
Parents: 76ca845
Author: Christopher L. Shannon (cshannon) <[hidden email]>
Authored: Thu Feb 16 09:18:04 2017 -0500
Committer: Christopher L. Shannon (cshannon) <[hidden email]>
Committed: Thu Feb 16 09:18:04 2017 -0500

----------------------------------------------------------------------
 .../activemq/cli/kahadb/exporter/Exporter.java  | 43 +++---------
 .../cli/kahadb/exporter/KahaDBExporter.java     | 47 ++++++-------
 .../kahadb/exporter/MessageStoreExporter.java   |  2 +
 .../exporter/MessageStoreMetadataExporter.java  | 24 +++++++
 .../artemis/ArtemisXmlMetadataExporter.java     | 74 ++++++++++++++++++++
 5 files changed, 130 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
index 4439a88..fee79bf 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
@@ -25,13 +25,9 @@ import java.util.zip.GZIPOutputStream;
 import javax.xml.stream.XMLOutputFactory;
 import javax.xml.stream.XMLStreamWriter;
 
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
 import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
-import org.apache.activemq.cli.schema.QueueBindingType;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMetadataExporter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,43 +63,20 @@ public class Exporter {
         try(OutputStream fos = new BufferedOutputStream(compress ? new GZIPOutputStream(
                 new FileOutputStream(artemisXml)) : new FileOutputStream(artemisXml))) {
 
-            XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
-            ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+            final XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
+            final ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+            final KahaDBExporter dbExporter = new KahaDBExporter(adapter,
+                    new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller),
+                    new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
 
             xmlMarshaller.appendJournalOpen();
             xmlMarshaller.appendBindingsElement();
-
-            adapter.getStore().getDestinations().stream()
-                .forEach(dest -> {
-                    try {
-                        if (dest.isQueue()) {
-                            xmlMarshaller.appendBinding(QueueBindingType.builder()
-                                    .withName(dest.getPhysicalName())
-                                    .withRoutingType(RoutingType.ANYCAST.toString())
-                                    .withAddress(dest.getPhysicalName()).build());
-                        } else if (dest.isTopic()) {
-                                for (SubscriptionInfo info :
-                                    adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
-                                    xmlMarshaller.appendBinding(QueueBindingType.builder()
-                                            .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
-                                                    true, info.getClientId(), info.getSubcriptionName()))
-                                            .withRoutingType(RoutingType.MULTICAST.toString())
-                                            .withAddress(dest.getPhysicalName()).build());
-                                }
-                        }
-                    } catch (Exception e) {
-                        throw new IllegalStateException(e);
-                    }
-                });
-
+            dbExporter.exportMetadata();
             xmlMarshaller.appendEndElement();
             xmlMarshaller.appendMessagesElement();
-
-            KahaDBExporter dbExporter = new KahaDBExporter(adapter,
-                    new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
-
             dbExporter.exportQueues();
             dbExporter.exportTopics();
+            xmlMarshaller.appendEndElement();
             xmlMarshaller.appendJournalClose(true);
         } finally {
             adapter.stop();

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
index 7eee0aa..dbe0114 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
@@ -25,7 +25,6 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.slf4j.Logger;
@@ -36,26 +35,44 @@ public class KahaDBExporter implements MessageStoreExporter {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBExporter.class);
 
     private final KahaDBPersistenceAdapter adapter;
+    private final MessageStoreMetadataExporter metadataExporter;
     private final MessageRecoveryListener recoveryListener;
 
-    public KahaDBExporter (final KahaDBPersistenceAdapter adapter,
+    public KahaDBExporter(final KahaDBPersistenceAdapter adapter,
+            final MessageStoreMetadataExporter metadataExporter,
             final MessageRecoveryListener recoveryListener) {
         this.adapter = adapter;
+        this.metadataExporter = metadataExporter;
         this.recoveryListener = recoveryListener;
     }
 
+
+    @Override
+    public void exportMetadata() throws IOException {
+        metadataExporter.export();
+    }
+
     @Override
     public void exportQueues() throws IOException {
+        exportDestinations(ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    @Override
+    public void exportTopics() throws IOException {
+        exportDestinations(ActiveMQDestination.TOPIC_TYPE);
+    }
 
+    private void exportDestinations(byte destType) throws IOException {
         final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter(
-                dest -> dest.isQueue()).collect(Collectors.toSet());
+                dest -> dest.getDestinationType() == destType).collect(Collectors.toSet());
 
         // loop through all queues and export them
         for (final ActiveMQDestination destination : destinations) {
 
             LOG.info("Starting export of: " + destination);
-            final ActiveMQQueue queue = (ActiveMQQueue) destination;
-            final MessageStore messageStore = adapter.createQueueMessageStore(queue);
+            final MessageStore messageStore = destination.isQueue() ?
+                    adapter.createQueueMessageStore((ActiveMQQueue) destination) :
+                    adapter.createTopicMessageStore((ActiveMQTopic) destination);
 
             try {
                 // migrate the data
@@ -66,24 +83,4 @@ public class KahaDBExporter implements MessageStoreExporter {
         }
     }
 
-    @Override
-    public void exportTopics() throws IOException {
-
-        final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter(
-                dest -> dest.isTopic()).collect(Collectors.toSet());
-
-        for (ActiveMQDestination destination : destinations) {
-            LOG.info("Starting export of: " + destination);
-
-            final ActiveMQTopic topic = (ActiveMQTopic) destination;
-            final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
-
-            //recover topic
-            try {
-                messageStore.recover(recoveryListener);
-            } catch (Exception e) {
-                IOExceptionSupport.create(e);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
index b228e19..b1217b4 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 
 public interface MessageStoreExporter {
 
+    public void exportMetadata() throws IOException;
+
     public void exportQueues() throws IOException;
 
     public void exportTopics() throws IOException;

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java
new file mode 100644
index 0000000..994528b
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreMetadataExporter.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.IOException;
+
+public interface MessageStoreMetadataExporter {
+
+    public void export() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/b8d33cdd/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
new file mode 100644
index 0000000..216a6a3
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter.artemis;
+
+import java.io.IOException;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
+import org.apache.activemq.cli.kahadb.exporter.MessageStoreMetadataExporter;
+import org.apache.activemq.cli.schema.QueueBindingType;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+public class ArtemisXmlMetadataExporter implements MessageStoreMetadataExporter {
+
+    private final KahaDBStore store;
+    private final ArtemisJournalMarshaller xmlMarshaller;
+
+
+    /**
+     * @param xmlMarshaller
+     */
+    public ArtemisXmlMetadataExporter(final KahaDBStore store,
+            final ArtemisJournalMarshaller xmlMarshaller) {
+        super();
+        this.store = store;
+        this.xmlMarshaller = xmlMarshaller;
+    }
+
+    @Override
+    public void export() throws IOException {
+        store.getDestinations().stream()
+        .forEach(dest -> {
+            try {
+                if (dest.isQueue()) {
+                    xmlMarshaller.appendBinding(QueueBindingType.builder()
+                            .withName(dest.getPhysicalName())
+                            .withRoutingType(RoutingType.ANYCAST.toString())
+                            .withAddress(dest.getPhysicalName()).build());
+                } else if (dest.isTopic()) {
+                        for (SubscriptionInfo info :
+                            store.createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
+                            xmlMarshaller.appendBinding(QueueBindingType.builder()
+                                    .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
+                                            true, info.getClientId(), info.getSubcriptionName()))
+                                    .withRoutingType(RoutingType.MULTICAST.toString())
+                                    .withAddress(dest.getPhysicalName()).build());
+                        }
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        });
+
+    }
+
+}