[GitHub] activemq-artemis pull request #1607: Artemis Kakfa Integration Bridge

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
54 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: Artemis Kakfa Integration Bridge

pgfox
GitHub user michaelandrepearce opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1607

    Artemis Kakfa Integration Bridge

    ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridge
   
    Initial Kafka Bridge for Apache ActiveMQ Artemis to Apache Kafka, using Serivce Connector interface
    Support to handle Core or AMQP based protocol over kafka.
    Ensured for Core TextMessage can map to StringSerializer for a consumer
    Ensure for Core ByteMessage can map to ByteArraySerializer for a consumer
    Kafka Serdes to support kafka consumers to consumer Core or AMQP back to CoreMessage, ProtonMessage, or JMSMessage
    Added Documentation
    Add integration tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/michaelandrepearce/activemq-artemis ARTEMIS-1478

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1607.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1607
   
----
commit 77937f23864425d544a7842e13e71b4cd39c0608
Author: Michael André Pearce <[hidden email]>
Date:   2017-10-24T07:51:36Z

    Artemis Kakfa Integration Bridge
   
   
    ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridge
   
    Initial Kafka Bridge for Apache ActiveMQ Artemis to Apache Kafka, using Serivce Connector interface
    Support to handle Core or AMQP based protocol over kafka.
    Ensured for Core TextMessage can map to StringSerializer for a consumer
    Ensure for Core ByteMessage can map to ByteArraySerializer for a consumer
    Kafka Serdes to support kafka consumers to consumer Core or AMQP back to CoreMessage, ProtonMessage, or JMSMessage
    Added Documentation
    Add integration tests

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146498875
 
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,179 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +![ActiveMQ Artemis Kafka Bridge Logo](images/activemq-kafka-bridge.png)
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis,
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup,
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also.
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka,
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options.
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +          
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized an Apache Avro object into a byte array for the ByteMessage you could deserialize using an e
    +
    --- End diff --
   
    is this a broken line ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146526503
 
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,179 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +![ActiveMQ Artemis Kafka Bridge Logo](images/activemq-kafka-bridge.png)
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis,
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup,
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also.
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka,
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options.
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +          
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized an Apache Avro object into a byte array for the ByteMessage you could deserialize using an e
    +
    --- End diff --
   
    Good spot , Looks like something didn’t commit from my local properly I’ll correct.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146532318
 
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,179 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +![ActiveMQ Artemis Kafka Bridge Logo](images/activemq-kafka-bridge.png)
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis,
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup,
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also.
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka,
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options.
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +          
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized an Apache Avro object into a byte array for the ByteMessage you could deserialize using an e
    +
    --- End diff --
   
    pushed, actually turns out i did just actually stop mid-sentence, during writing the doc, i have completed the section now.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146578236
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    --- End diff --
   
    I'm not an expert on Artemis but can you explain me why the LVQ is used as key for partitions ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146578570
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
   
    It's the same way as the DefaultPartitioner works in Kafka, why do you re-used the logic here ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146579371
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    +
    +         ProducerRecord<String, Message> producerRecord =
    +             new ProducerRecord<>(
    +                   topicName,
    +                   partition,
    +                   message.getTimestamp(),
    +                   key == null ? null : key.toString(),
    +                   message);
    --- End diff --
   
    Providing both partition and key (if it's not null), the Kafka producer will always use partition which takes precedence on the use of the key. It means that if groupId is specified, the LVQ (as key) won't be consider for addressing the partition. Is it exactly what you had in mind ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146586161
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    --- End diff --
   
    Its not used for partitions, Key in Kafka is used for compaction. We use groupId (JMS MessageGroup) to sort partition see code above.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146586304
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    +
    +         ProducerRecord<String, Message> producerRecord =
    +             new ProducerRecord<>(
    +                   topicName,
    +                   partition,
    +                   message.getTimestamp(),
    +                   key == null ? null : key.toString(),
    +                   message);
    --- End diff --
   
    Yes key, is used for compaction in kafka primarily, we use groupId if present to primarily partition by to keep similar semantics as consumers in kafka consume by partition there for a group is honoured.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146586855
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
   
    You don't have access to native headers in the partitioner. As such where serializers encode the payload into the value and map message headers (groupID) to kafka native headers, it is not available within kafka partitioner class.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user gemmellr commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    Skimming the code quickly, I had to do a double take on a couple of occasions, and had some related observations.
   
    There looked to be extensive use of implementation detail classes from the Qpid JMS client here. This seems inherently fragile as these classes are not intended for direct application usage in this fashion, and are entirely subject to change from release to release. Related, there also appeared to be some new class definitions within the org.apache.qpid namespace, which doesn't seem nice either.
   
    Separately, I don't really understand why they would need/want to be used this way anyway. Presumably the Kafka message content in that case is an encoded AMQP message, so anything that can decode AMQP message would work, e.g the proton based bits also included. Its not clear to me why it woudl be desirable to involve JMS at all. Regardless, if you actually wanted a JMS message, why not consume it from the broker using the JMS client to begin with?
   
    The logo feels like something only a standalone component would have.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    @gemmellr agreed on class def in names space this was because we couldn't create object because methods are package, as it notes it would be good if could get qpid to make them public to have them re-usable?
   
    Yes anything can work, we are simply providing an implementation to make things easy out the box.
   
    Im happy to remove the QPID JMS one it is just for end users, giving as i said an out the box solution for them consumer side, but  its not actually needed by the bridge.
   
   



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user tabish121 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    Given that the code being used from Qpid JMS is internal implementation that has no promise of remaining stable and will not be made public I think that stuff should go.  
   
    So far from what I see this doesn't seem like something that should go into the broker project but instead could be a stand alone project.   I'd prefer not to drag in Kafka dependencies into the distribution when not needed.  


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    @gemmellr I've removed the qpid JMS bit.
   
    @tabish121 i see this no different to the vertx connecter service etc that was/is in 1.5.x range.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user gemmellr commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    @michaelandrepearce Ah, I missed the note while skimming. The methods will presumably be at package visibility in keeping with the fact they are not intended to be used outside the client itself, so I don't see that changing. I think removing the JMS bits makes sense.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    @gemmellr have removed entirely to avoid issue.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user tabish121 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    @michaelandrepearce apart from the fact that those are all removed now?  


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
 
    there wasn't a JIRA to remove them, they just didn't get handled in the move to 2.x, and no ones done the work to update them to re-add them. Its not that we don't want connectors.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146623227
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
   
    "You don't have access to native headers in the partitioner" ...  I didn't get this point. Serialization and Partitioning are two different steps in the Kafka producer pipeline (first serialization, then partitioning). About the partitioner, if you pass the key to the producer send method, it applies exactly this :
   
    `Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions`
   
    where `keyBytes` is just  `groupid`. Isn't it ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

pgfox
In reply to this post by pgfox
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146624344
 
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
   
    No keybytes is the LVQ property if set, as key is used for compacted topics.


---
123