[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
GitHub user franz1981 opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1390

    ARTEMIS-1273 Bounded OrderedExecutor

    Introduce by default a BoundedOrderedExecutor that:
    1. uses Array based lock/wait-free queues to have contiguos allocations
    2. OOM protected limiting the allowed max size of the queue
    3. produce garbage only if the consumer can't keep up with the producer by a configurable amount
    4. uses a scalable handmade spin lock (XADD based)
    5. cost of monitoring pending tasks list size O(1)
    6. allow optimized versions of task submission using different specialized concurrent queues

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/franz1981/activemq-artemis jctools_ordered_executor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1390.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1390
   
----
commit 220ed672d7da50dd316dfb221effcd455c3cc163
Author: Francesco Nigro <[hidden email]>
Date:   2016-12-19T15:02:17Z

    ARTEMIS-1273 Bounded OrderedExecutor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1390#discussion_r126428498
 
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils;
    +
    +import java.util.Queue;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
    +import org.jboss.logging.Logger;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscChunkedArrayQueue;
    +import org.jctools.queues.SpscArrayQueue;
    +import org.jctools.queues.SpscChunkedArrayQueue;
    +import org.jctools.util.Pow2;
    +
    +/**
    + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
    + */
    +public final class BoundedOrderedExecutorFactory implements ExecutorFactory {
    --- End diff --
   
    where the word Bounded came from? is this a good name?
   
    What about ArrayOrderedExecutor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
In reply to this post by mtaylor
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1390#discussion_r126428552
 
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils;
    +
    +import java.util.Queue;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
    +import org.jboss.logging.Logger;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscChunkedArrayQueue;
    +import org.jctools.queues.SpscArrayQueue;
    +import org.jctools.queues.SpscChunkedArrayQueue;
    +import org.jctools.util.Pow2;
    +
    +/**
    + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
    + */
    +public final class BoundedOrderedExecutorFactory implements ExecutorFactory {
    --- End diff --
   
    QueueArrayOrderedExecutor



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
In reply to this post by mtaylor
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1390#discussion_r126428819
 
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils;
    +
    +import java.util.Queue;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
    +import org.jboss.logging.Logger;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscChunkedArrayQueue;
    +import org.jctools.queues.SpscArrayQueue;
    +import org.jctools.queues.SpscChunkedArrayQueue;
    +import org.jctools.util.Pow2;
    +
    +/**
    + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
    + */
    +public final class BoundedOrderedExecutorFactory implements ExecutorFactory {
    +
    +   public enum ProducerType {
    +      Single, Multi
    +   }
    +
    +   public static final int DEFAULT_INITIAL_CAPACITY = Integer.getInteger("bounded.executor.initial.capacity", 1024);
    +   public static final int DEFAULT_MAX_CAPACITY = Integer.getInteger("bounded.executor.max.capacity", Pow2.MAX_POW2);
    +   public static final int DEFAULT_MAX_BURST_SIZE = Integer.getInteger("bounded.executor.max.burst", Integer.MAX_VALUE);
    +   private static final Logger logger = Logger.getLogger(BoundedOrderedExecutorFactory.class);
    +
    +   private final ProducerType producerType;
    +   private final Executor parent;
    +   private final int initialCapacity;
    +   private final int maxCapacity;
    +   private final int maxBurstSize;
    +
    +   public BoundedOrderedExecutorFactory(Executor parent) {
    +      this(ProducerType.Multi, parent, DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY, DEFAULT_MAX_BURST_SIZE);
    +   }
    +
    +   /**
    +    * Construct a new instance delegating to the given parent executor.
    +    *
    +    * @param parent the parent executor
    +    */
    +   public BoundedOrderedExecutorFactory(ProducerType producerType,
    +                                        Executor parent,
    +                                        int initialCapacity,
    +                                        int maxCapacity,
    +                                        int maxBurstSize) {
    +      this.producerType = producerType;
    +      this.parent = parent;
    +      this.initialCapacity = initialCapacity;
    +      this.maxCapacity = maxCapacity;
    +      this.maxBurstSize = maxBurstSize;
    +   }
    +
    +   private static int drainCommands(final Queue<Runnable> commands, int maxBurstSize) {
    +      for (int i = 0; i < maxBurstSize; i++) {
    +         final Runnable command = commands.poll();
    +         if (command == null) {
    +            return i;
    +         }
    +         try {
    +            command.run();
    +         } catch (ActiveMQInterruptedException e) {
    +            // This could happen during shutdowns. Nothing to be concerned about here
    +            logger.debug("Interrupted Thread", e);
    +         } catch (Throwable t) {
    +            logger.warn(t.getMessage(), t);
    +         }
    +      }
    +      return maxBurstSize;
    +   }
    +
    +   /**
    +    * Get an executor that always executes tasks in order.
    +    *
    +    * @return an ordered executor
    +    */
    +   @Override
    +   public BoundedExecutor getExecutor() {
    +      if (this.initialCapacity == this.maxCapacity) {
    +         return createFixedExecutor(this.producerType, this.maxCapacity, this.maxBurstSize, this.parent);
    +      } else {
    +         return createGrowableExecutor(this.producerType, this.initialCapacity, this.maxCapacity, this.maxBurstSize, this.parent);
    +      }
    +   }
    +
    +   public static BoundedExecutor createFixedExecutor(ProducerType producerType,
    +                                                     int requiredCapacity,
    +                                                     int maxBurstSize,
    +                                                     Executor parent) {
    +      final Queue<Runnable> queue;
    +      final int capacity;
    +      switch (producerType) {
    +         case Multi:
    +            final MpscArrayQueue<Runnable> mpscChunkedArrayQueue = new MpscArrayQueue<>(requiredCapacity);
    +            queue = mpscChunkedArrayQueue;
    +            capacity = mpscChunkedArrayQueue.capacity();
    +            break;
    +         case Single:
    +            final SpscArrayQueue<Runnable> spscChunkedArrayQueue = new SpscArrayQueue<>(requiredCapacity);
    +            queue = spscChunkedArrayQueue;
    +            capacity = spscChunkedArrayQueue.capacity();
    +            break;
    +         default:
    +            throw new AssertionError("producerType not supported");
    +      }
    +      return new BoundedOrderedExecutor(queue, capacity, maxBurstSize, parent);
    +   }
    +
    +   public static BoundedExecutor createGrowableExecutor(ProducerType producerType,
    +                                                        int initialCapacity,
    +                                                        int maxCapacity,
    +                                                        int maxBurstSize,
    +                                                        Executor parent) {
    +      final Queue<Runnable> queue;
    +      final int capacity;
    +      switch (producerType) {
    +         case Multi:
    +            final MpscChunkedArrayQueue<Runnable> mpscChunkedArrayQueue = new MpscChunkedArrayQueue<>(initialCapacity, maxCapacity);
    +            queue = mpscChunkedArrayQueue;
    +            capacity = mpscChunkedArrayQueue.capacity();
    +            break;
    +         case Single:
    +            final SpscChunkedArrayQueue<Runnable> spscChunkedArrayQueue = new SpscChunkedArrayQueue<>(initialCapacity, maxCapacity);
    +            queue = spscChunkedArrayQueue;
    +            //SpscChunkedArrayQueue doesn't support the MessagePassingQueue interface yet!
    +            capacity = Pow2.roundToPowerOfTwo(maxCapacity);
    +            break;
    +         default:
    +            throw new AssertionError("producerType not supported");
    +      }
    +      return new BoundedOrderedExecutor(queue, capacity, maxBurstSize, parent);
    +   }
    +
    +   /**
    +    * The padding is employed for false sharing protection: separate the cold fields (load only) from the hot one (the state, store and load)
    +    */
    +   private abstract static class BoundedOrderedExecutorL0Pad {
    +
    +      protected long p00, p01, p02, p03, p04, p05, p06;
    +      protected long p10, p11, p12, p13, p14, p15, p16, p17;
    +   }
    +
    +   private abstract static class OrderedExecutorState extends BoundedOrderedExecutorL0Pad {
    +
    +      protected static final int RELEASED = 0;
    +      public static final AtomicIntegerFieldUpdater<OrderedExecutorState> STATE_UPDATER;
    +
    +      static {
    +         STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutorState.class, "state");
    +      }
    +
    +      private volatile int state = 0;
    +
    +   }
    +
    +   private abstract static class BoundedOrderedExecutorL1Pad extends OrderedExecutorState {
    +
    +      protected long p01, p02, p03, p04, p05, p06, p07;
    +      protected long p10, p11, p12, p13, p14, p15, p16, p17;
    +   }
    +
    +   private static final class BoundedOrderedExecutor extends BoundedOrderedExecutorL1Pad implements BoundedExecutor {
    +
    +      private final Queue<Runnable> commands;
    +      private final Runnable executeCommandsTask;
    +      private final Executor delegate;
    +      private final int maxBurstSize;
    +      private final int capacity;
    +
    +      BoundedOrderedExecutor(Queue<Runnable> commands, int capacity, int maxBurstSize, Executor delegate) {
    +         this.commands = commands;
    +         this.executeCommandsTask = this::executeCommands;
    +         this.delegate = delegate;
    +         this.capacity = capacity;
    +         this.maxBurstSize = maxBurstSize;
    +      }
    +
    +      private boolean tryAcquire() {
    +         //much cheaper than CAS when highly contended
    +         final long oldState = STATE_UPDATER.getAndIncrement(this);
    +         final boolean isAcquired = oldState == RELEASED;
    +         return isAcquired;
    +      }
    +
    +      private boolean isReleased() {
    +         return STATE_UPDATER.get(this) == RELEASED;
    +      }
    +
    +      private void release() {
    +         //StoreStore + LoadStore: much cheaper than a volatile store
    +         STATE_UPDATER.lazySet(this, RELEASED);
    +      }
    +
    +      private void executeCommands() {
    +         final Queue<Runnable> commands = this.commands;
    +         final int maxBurstSize = this.maxBurstSize;
    +         //let others consumers to try to acquire the lock and drain the tasks
    +         while (!commands.isEmpty() && tryAcquire()) {
    +            try {
    +               drainCommands(commands, maxBurstSize);
    +            } finally {
    +               release();
    +            }
    +         }
    +      }
    +
    +      @Override
    +      public int capacity() {
    +         return this.capacity;
    +      }
    +
    +      @Override
    +      public int pendingTasks() {
    +         return this.commands.size();
    +      }
    +
    +      @Override
    +      public boolean isEmpty() {
    +         return this.commands.isEmpty();
    +      }
    +
    +      @Override
    +      public boolean tryExecute(Runnable command) {
    +         //no optimisations on recursive offers
    +         if (commands.offer(command)) {
    +            if (isReleased() && !commands.isEmpty()) {
    +               this.delegate.execute(executeCommandsTask);
    +            }
    +            return true;
    +         } else {
    +            return false;
    +         }
    +      }
    +
    +      @Override
    +      public void execute(Runnable command) {
    +         //no optimisations on recursive offers
    +         if (!commands.offer(command)) {
    +            throw new RejectedExecutionException("can't submit the task: max capacity reached");
    --- End diff --
   
    I see where the word bounded came from.. there is a limit?
   
    What about the Growable Queue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
In reply to this post by mtaylor
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1390#discussion_r126434504
 
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils;
    +
    +import java.util.Queue;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
    +import org.jboss.logging.Logger;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscChunkedArrayQueue;
    +import org.jctools.queues.SpscArrayQueue;
    +import org.jctools.queues.SpscChunkedArrayQueue;
    +import org.jctools.util.Pow2;
    +
    +/**
    + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
    + */
    +public final class BoundedOrderedExecutorFactory implements ExecutorFactory {
    --- End diff --
   
    I can change it :+1:
    The main reason of that name is that the the max capacity (of submittable tasks) is bounded in some way...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
In reply to this post by mtaylor
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1390#discussion_r126435050
 
    --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.utils;
    +
    +import java.util.Queue;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
    +import org.jboss.logging.Logger;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscChunkedArrayQueue;
    +import org.jctools.queues.SpscArrayQueue;
    +import org.jctools.queues.SpscChunkedArrayQueue;
    +import org.jctools.util.Pow2;
    +
    +/**
    + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
    + */
    +public final class BoundedOrderedExecutorFactory implements ExecutorFactory {
    +
    +   public enum ProducerType {
    +      Single, Multi
    +   }
    +
    +   public static final int DEFAULT_INITIAL_CAPACITY = Integer.getInteger("bounded.executor.initial.capacity", 1024);
    +   public static final int DEFAULT_MAX_CAPACITY = Integer.getInteger("bounded.executor.max.capacity", Pow2.MAX_POW2);
    +   public static final int DEFAULT_MAX_BURST_SIZE = Integer.getInteger("bounded.executor.max.burst", Integer.MAX_VALUE);
    +   private static final Logger logger = Logger.getLogger(BoundedOrderedExecutorFactory.class);
    +
    +   private final ProducerType producerType;
    +   private final Executor parent;
    +   private final int initialCapacity;
    +   private final int maxCapacity;
    +   private final int maxBurstSize;
    +
    +   public BoundedOrderedExecutorFactory(Executor parent) {
    +      this(ProducerType.Multi, parent, DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY, DEFAULT_MAX_BURST_SIZE);
    +   }
    +
    +   /**
    +    * Construct a new instance delegating to the given parent executor.
    +    *
    +    * @param parent the parent executor
    +    */
    +   public BoundedOrderedExecutorFactory(ProducerType producerType,
    +                                        Executor parent,
    +                                        int initialCapacity,
    +                                        int maxCapacity,
    +                                        int maxBurstSize) {
    +      this.producerType = producerType;
    +      this.parent = parent;
    +      this.initialCapacity = initialCapacity;
    +      this.maxCapacity = maxCapacity;
    +      this.maxBurstSize = maxBurstSize;
    +   }
    +
    +   private static int drainCommands(final Queue<Runnable> commands, int maxBurstSize) {
    +      for (int i = 0; i < maxBurstSize; i++) {
    +         final Runnable command = commands.poll();
    +         if (command == null) {
    +            return i;
    +         }
    +         try {
    +            command.run();
    +         } catch (ActiveMQInterruptedException e) {
    +            // This could happen during shutdowns. Nothing to be concerned about here
    +            logger.debug("Interrupted Thread", e);
    +         } catch (Throwable t) {
    +            logger.warn(t.getMessage(), t);
    +         }
    +      }
    +      return maxBurstSize;
    +   }
    +
    +   /**
    +    * Get an executor that always executes tasks in order.
    +    *
    +    * @return an ordered executor
    +    */
    +   @Override
    +   public BoundedExecutor getExecutor() {
    +      if (this.initialCapacity == this.maxCapacity) {
    +         return createFixedExecutor(this.producerType, this.maxCapacity, this.maxBurstSize, this.parent);
    +      } else {
    +         return createGrowableExecutor(this.producerType, this.initialCapacity, this.maxCapacity, this.maxBurstSize, this.parent);
    +      }
    +   }
    +
    +   public static BoundedExecutor createFixedExecutor(ProducerType producerType,
    +                                                     int requiredCapacity,
    +                                                     int maxBurstSize,
    +                                                     Executor parent) {
    +      final Queue<Runnable> queue;
    +      final int capacity;
    +      switch (producerType) {
    +         case Multi:
    +            final MpscArrayQueue<Runnable> mpscChunkedArrayQueue = new MpscArrayQueue<>(requiredCapacity);
    +            queue = mpscChunkedArrayQueue;
    +            capacity = mpscChunkedArrayQueue.capacity();
    +            break;
    +         case Single:
    +            final SpscArrayQueue<Runnable> spscChunkedArrayQueue = new SpscArrayQueue<>(requiredCapacity);
    +            queue = spscChunkedArrayQueue;
    +            capacity = spscChunkedArrayQueue.capacity();
    +            break;
    +         default:
    +            throw new AssertionError("producerType not supported");
    +      }
    +      return new BoundedOrderedExecutor(queue, capacity, maxBurstSize, parent);
    +   }
    +
    +   public static BoundedExecutor createGrowableExecutor(ProducerType producerType,
    +                                                        int initialCapacity,
    +                                                        int maxCapacity,
    +                                                        int maxBurstSize,
    +                                                        Executor parent) {
    +      final Queue<Runnable> queue;
    +      final int capacity;
    +      switch (producerType) {
    +         case Multi:
    +            final MpscChunkedArrayQueue<Runnable> mpscChunkedArrayQueue = new MpscChunkedArrayQueue<>(initialCapacity, maxCapacity);
    +            queue = mpscChunkedArrayQueue;
    +            capacity = mpscChunkedArrayQueue.capacity();
    +            break;
    +         case Single:
    +            final SpscChunkedArrayQueue<Runnable> spscChunkedArrayQueue = new SpscChunkedArrayQueue<>(initialCapacity, maxCapacity);
    +            queue = spscChunkedArrayQueue;
    +            //SpscChunkedArrayQueue doesn't support the MessagePassingQueue interface yet!
    +            capacity = Pow2.roundToPowerOfTwo(maxCapacity);
    +            break;
    +         default:
    +            throw new AssertionError("producerType not supported");
    +      }
    +      return new BoundedOrderedExecutor(queue, capacity, maxBurstSize, parent);
    +   }
    +
    +   /**
    +    * The padding is employed for false sharing protection: separate the cold fields (load only) from the hot one (the state, store and load)
    +    */
    +   private abstract static class BoundedOrderedExecutorL0Pad {
    +
    +      protected long p00, p01, p02, p03, p04, p05, p06;
    +      protected long p10, p11, p12, p13, p14, p15, p16, p17;
    +   }
    +
    +   private abstract static class OrderedExecutorState extends BoundedOrderedExecutorL0Pad {
    +
    +      protected static final int RELEASED = 0;
    +      public static final AtomicIntegerFieldUpdater<OrderedExecutorState> STATE_UPDATER;
    +
    +      static {
    +         STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutorState.class, "state");
    +      }
    +
    +      private volatile int state = 0;
    +
    +   }
    +
    +   private abstract static class BoundedOrderedExecutorL1Pad extends OrderedExecutorState {
    +
    +      protected long p01, p02, p03, p04, p05, p06, p07;
    +      protected long p10, p11, p12, p13, p14, p15, p16, p17;
    +   }
    +
    +   private static final class BoundedOrderedExecutor extends BoundedOrderedExecutorL1Pad implements BoundedExecutor {
    +
    +      private final Queue<Runnable> commands;
    +      private final Runnable executeCommandsTask;
    +      private final Executor delegate;
    +      private final int maxBurstSize;
    +      private final int capacity;
    +
    +      BoundedOrderedExecutor(Queue<Runnable> commands, int capacity, int maxBurstSize, Executor delegate) {
    +         this.commands = commands;
    +         this.executeCommandsTask = this::executeCommands;
    +         this.delegate = delegate;
    +         this.capacity = capacity;
    +         this.maxBurstSize = maxBurstSize;
    +      }
    +
    +      private boolean tryAcquire() {
    +         //much cheaper than CAS when highly contended
    +         final long oldState = STATE_UPDATER.getAndIncrement(this);
    +         final boolean isAcquired = oldState == RELEASED;
    +         return isAcquired;
    +      }
    +
    +      private boolean isReleased() {
    +         return STATE_UPDATER.get(this) == RELEASED;
    +      }
    +
    +      private void release() {
    +         //StoreStore + LoadStore: much cheaper than a volatile store
    +         STATE_UPDATER.lazySet(this, RELEASED);
    +      }
    +
    +      private void executeCommands() {
    +         final Queue<Runnable> commands = this.commands;
    +         final int maxBurstSize = this.maxBurstSize;
    +         //let others consumers to try to acquire the lock and drain the tasks
    +         while (!commands.isEmpty() && tryAcquire()) {
    +            try {
    +               drainCommands(commands, maxBurstSize);
    +            } finally {
    +               release();
    +            }
    +         }
    +      }
    +
    +      @Override
    +      public int capacity() {
    +         return this.capacity;
    +      }
    +
    +      @Override
    +      public int pendingTasks() {
    +         return this.commands.size();
    +      }
    +
    +      @Override
    +      public boolean isEmpty() {
    +         return this.commands.isEmpty();
    +      }
    +
    +      @Override
    +      public boolean tryExecute(Runnable command) {
    +         //no optimisations on recursive offers
    +         if (commands.offer(command)) {
    +            if (isReleased() && !commands.isEmpty()) {
    +               this.delegate.execute(executeCommandsTask);
    +            }
    +            return true;
    +         } else {
    +            return false;
    +         }
    +      }
    +
    +      @Override
    +      public void execute(Runnable command) {
    +         //no optimisations on recursive offers
    +         if (!commands.offer(command)) {
    +            throw new RejectedExecutionException("can't submit the task: max capacity reached");
    --- End diff --
   
    Exactly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis issue #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
In reply to this post by mtaylor
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1390
 
    What about simply change OrderedExecutorFactory /OrderedExecutor use the queue you want?
    Maybe even pass in the queue as a parameter?
   
   
    I'm sending a PR to an actorExecutor, if you could make your changes in top of that one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis issue #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
In reply to this post by mtaylor
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1390
 
    so, if you could make a change to #1395 after merged? (if you agree with that PR), in such way that ProcessorBase would take the Queue type that you want.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] activemq-artemis pull request #1390: ARTEMIS-1273 Bounded OrderedExecutor

mtaylor
In reply to this post by mtaylor
Github user franz1981 closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/1390


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Loading...