[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
GitHub user franz1981 opened a pull request:

    https://github.com/apache/activemq-artemis/pull/2494

    ARTEMIS-2224 Reduce contention on LivePageCacheImpl

    It includes:
   
    - **lock-free LivePageCache + tests**:
    LivePageCacheImpl has been reimplemented to be
    lock-free, multi-producer and multi-consumer
    in any of its operations.
    - **Avoid unnecessary page cache queries on ack TX**:
    PageSubscriptionImpl::ackTx is already performing a counter update
    using the message persistent size: the size can be reused on
    PagePosition::setPersistentSize, avoiding to query the page cache just
    to compute it.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/franz1981/activemq-artemis lock-free-paging

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/2494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2494
   
----

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #2494: ARTEMIS-2224 Reduce contention on LivePageCach...

asfgit
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2494
 
    The effects of the contention on `LivePageCacheImpl` are more visible after applying https://github.com/apache/activemq-artemis/pull/2484


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #2494: ARTEMIS-2224 Reduce contention on LivePageCach...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2494
 
    @michaelandrepearce @wy96f @qihongxu Please review and test :+1:


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #2494: ARTEMIS-2224 Reduce contention on LivePageCach...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2494
 
    Most of the benefits on memory footprint are already explained on https://github.com/qihongxu/activemq-artemis/pull/1, while about the contention I have already built several contention graphs that shows that this implementation scale linearly with the number of producers on a topic, without any contention.
    In addition, several JMH microbenchs (not attached here) shows that is always fastest (if single-threaded too) then the original one for any operations and in any condition.
    The only missing bits is an end 2 end test after applying #2484


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246558406
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -28,15 +28,49 @@
     /**
      * This is the same as PageCache, however this is for the page that's being currently written.
      */
    --- End diff --
   
    Isn't this mixing the collection implementation itself into the LivePageCache?
   
    isn't there a way to implement this logic into its own structure? Like PageCache using a generic ChunkArray (a name I just came up here)?
   
    I'm a bit concerned on maintaining the business side of this issue (that is the PageCache) with the speedy implementation of a collection.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246563763
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -28,15 +28,49 @@
     /**
      * This is the same as PageCache, however this is for the page that's being currently written.
      */
    --- End diff --
   
    @franz1981 Id agree with @clebertsuconic here, a bit like what ive done with priority consumers, i ended up splitting out the collections logic, which has ended up making it cleaner, and easier to reason with. (and as youve marked out on its pr, more testable ;) )


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246566042
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    +      //size is never allowed to be > Integer.MAX_VALUE
    +      final int lastChunkIndex = (int) size >> chunkSizeLog2;
    +      int requiredJumps = jumps;
    +      AtomicChunk<PagedMessage> jumpBuffer = null;
    +      boolean jumpForward = true;
    +      int distanceFromLastChunkIndex = lastChunkIndex - jumps;
    +      //it's worth to go backward from lastChunkIndex?
    +      //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
    +      if (distanceFromLastChunkIndex < jumps) {
    +         final AtomicChunk<PagedMessage> producer = producerBuffer;
    +         //producer is a potential moving, always increasing, target ie better to re-check the distance
    +         distanceFromLastChunkIndex = producer.index - jumps;
    +         if (distanceFromLastChunkIndex < jumps) {
    +            //we're saving some jumps ie is fine to go backward from here
    +            jumpBuffer = producer;
    +            requiredJumps = distanceFromLastChunkIndex;
    +            jumpForward = false;
    +         }
    +      }
    +      //start from the consumer buffer only is needed
    +      if (jumpBuffer == null) {
    +         jumpBuffer = consumerBuffer;
    +      }
    +      for (int i = 0; i < requiredJumps; i++) {
    +         //next chunk is always set if below a read producerIndex value
    +         //previous chunk is final and can be safely read
    +         jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
    +      }
    +      return jumpBuffer;
        }
     
        @Override
    -   public synchronized boolean isLive() {
    +   public boolean isLive() {
           return isLive;
        }
     
        @Override
    -   public synchronized void addLiveMessage(PagedMessage message) {
    +   public void addLiveMessage(PagedMessage message) {
           if (message.getMessage().isLargeMessage()) {
              ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
           }
    -      this.messages.add(message);
    +      while (true) {
    +         final long pIndex = producerIndex;
    +         if (pIndex != RESIZING) {
    +            if (pIndex == Integer.MAX_VALUE) {
    +               throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
    +            }
    +            //load acquire the current producer buffer
    +            final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
    +            final int pOffset = (int) (pIndex & chunkMask);
    +            //only the first message to a chunk can attempt to resize
    +            if (pOffset == 0) {
    +               if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
    +                  return;
    +               }
    +            } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
    +               //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
    +               //NOTE: producerIndex is being updated before setting a new value
    +               producerBuffer.lazySet(pOffset, message);
    +               return;
    +            }
    +         }
    +         Thread.yield();
    +      }
    +   }
    +
    +   private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
    +      if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
    +         return false;
    +      }
    +      final AtomicChunk<PagedMessage> newChunk;
    +      try {
    +         final int index = (int) (pIndex >> chunkSizeLog2);
    +         newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
    +      } catch (OutOfMemoryError oom) {
    +         //unblock producerIndex without updating it
    +         PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
    +         throw oom;
    +      }
    +      //adding the message to it
    +      newChunk.lazySet(0, message);
    +      //linking it to the old one, if any
    +      if (producerBuffer != null) {
    +         //a plain store is enough, given that producerIndex prevents any reader/writer to access it
    +         producerBuffer.next = newChunk;
    +      } else {
    +         //it's first one
    +         this.consumerBuffer = newChunk;
    +      }
    +      //making it the current produced one
    +      this.producerBuffer = newChunk;
    +      //store release any previous write and "unblock" anyone waiting resizing to finish
    +      PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
    +      return true;
        }
     
        @Override
    -   public synchronized void close() {
    +   public void close() {
           logger.tracef("Closing %s", this);
           this.isLive = false;
    --- End diff --
   
    now isLive is volatile and sync is remove, this should be updated by an atomic field updater? no?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567905
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -28,15 +28,49 @@
     /**
      * This is the same as PageCache, however this is for the page that's being currently written.
      */
    --- End diff --
   
    Makes sense :+1:


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    +      //size is never allowed to be > Integer.MAX_VALUE
    +      final int lastChunkIndex = (int) size >> chunkSizeLog2;
    +      int requiredJumps = jumps;
    +      AtomicChunk<PagedMessage> jumpBuffer = null;
    +      boolean jumpForward = true;
    +      int distanceFromLastChunkIndex = lastChunkIndex - jumps;
    +      //it's worth to go backward from lastChunkIndex?
    +      //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
    +      if (distanceFromLastChunkIndex < jumps) {
    +         final AtomicChunk<PagedMessage> producer = producerBuffer;
    +         //producer is a potential moving, always increasing, target ie better to re-check the distance
    +         distanceFromLastChunkIndex = producer.index - jumps;
    +         if (distanceFromLastChunkIndex < jumps) {
    +            //we're saving some jumps ie is fine to go backward from here
    +            jumpBuffer = producer;
    +            requiredJumps = distanceFromLastChunkIndex;
    +            jumpForward = false;
    +         }
    +      }
    +      //start from the consumer buffer only is needed
    +      if (jumpBuffer == null) {
    +         jumpBuffer = consumerBuffer;
    +      }
    +      for (int i = 0; i < requiredJumps; i++) {
    +         //next chunk is always set if below a read producerIndex value
    +         //previous chunk is final and can be safely read
    +         jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
    +      }
    +      return jumpBuffer;
        }
     
        @Override
    -   public synchronized boolean isLive() {
    +   public boolean isLive() {
           return isLive;
        }
     
        @Override
    -   public synchronized void addLiveMessage(PagedMessage message) {
    +   public void addLiveMessage(PagedMessage message) {
           if (message.getMessage().isLargeMessage()) {
              ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
           }
    -      this.messages.add(message);
    +      while (true) {
    +         final long pIndex = producerIndex;
    +         if (pIndex != RESIZING) {
    +            if (pIndex == Integer.MAX_VALUE) {
    +               throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
    +            }
    +            //load acquire the current producer buffer
    +            final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
    +            final int pOffset = (int) (pIndex & chunkMask);
    +            //only the first message to a chunk can attempt to resize
    +            if (pOffset == 0) {
    +               if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
    +                  return;
    +               }
    +            } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
    +               //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
    +               //NOTE: producerIndex is being updated before setting a new value
    +               producerBuffer.lazySet(pOffset, message);
    +               return;
    +            }
    +         }
    +         Thread.yield();
    +      }
    +   }
    +
    +   private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
    +      if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
    +         return false;
    +      }
    +      final AtomicChunk<PagedMessage> newChunk;
    +      try {
    +         final int index = (int) (pIndex >> chunkSizeLog2);
    +         newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
    +      } catch (OutOfMemoryError oom) {
    +         //unblock producerIndex without updating it
    +         PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
    +         throw oom;
    +      }
    +      //adding the message to it
    +      newChunk.lazySet(0, message);
    +      //linking it to the old one, if any
    +      if (producerBuffer != null) {
    +         //a plain store is enough, given that producerIndex prevents any reader/writer to access it
    +         producerBuffer.next = newChunk;
    +      } else {
    +         //it's first one
    +         this.consumerBuffer = newChunk;
    +      }
    +      //making it the current produced one
    +      this.producerBuffer = newChunk;
    +      //store release any previous write and "unblock" anyone waiting resizing to finish
    +      PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
    +      return true;
        }
     
        @Override
    -   public synchronized void close() {
    +   public void close() {
           logger.tracef("Closing %s", this);
           this.isLive = false;
        }
     
    +   private static PagedMessage[] EMPTY_MSG = null;
    +
    +   private static PagedMessage[] noMessages() {
    +      //it is a benign race: no need strong initializations here
    +      PagedMessage[] empty = EMPTY_MSG;
    +      if (empty != null) {
    +         return empty;
    +      } else {
    +         empty = new PagedMessage[0];
    --- End diff --
   
    Why not simply make noMessages return a static empty array, e.g. make PagedMessage[0] - similar in nature to https://android.googlesource.com/platform/libcore/+/jb-mr2-release/luni/src/main/java/libcore/util/EmptyArray.java, and then saves instantiating every time.  
   
    Also avoids a race condition, this current code, seems to have with lazy initialization, e.g. currently if two threads concurrently call noMessages its possible i get two PagedMessage[0] objects created.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568367
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    +      //size is never allowed to be > Integer.MAX_VALUE
    +      final int lastChunkIndex = (int) size >> chunkSizeLog2;
    +      int requiredJumps = jumps;
    +      AtomicChunk<PagedMessage> jumpBuffer = null;
    +      boolean jumpForward = true;
    +      int distanceFromLastChunkIndex = lastChunkIndex - jumps;
    +      //it's worth to go backward from lastChunkIndex?
    +      //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
    +      if (distanceFromLastChunkIndex < jumps) {
    +         final AtomicChunk<PagedMessage> producer = producerBuffer;
    +         //producer is a potential moving, always increasing, target ie better to re-check the distance
    +         distanceFromLastChunkIndex = producer.index - jumps;
    +         if (distanceFromLastChunkIndex < jumps) {
    +            //we're saving some jumps ie is fine to go backward from here
    +            jumpBuffer = producer;
    +            requiredJumps = distanceFromLastChunkIndex;
    +            jumpForward = false;
    +         }
    +      }
    +      //start from the consumer buffer only is needed
    +      if (jumpBuffer == null) {
    +         jumpBuffer = consumerBuffer;
    +      }
    +      for (int i = 0; i < requiredJumps; i++) {
    +         //next chunk is always set if below a read producerIndex value
    +         //previous chunk is final and can be safely read
    +         jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
    +      }
    +      return jumpBuffer;
        }
     
        @Override
    -   public synchronized boolean isLive() {
    +   public boolean isLive() {
           return isLive;
        }
     
        @Override
    -   public synchronized void addLiveMessage(PagedMessage message) {
    +   public void addLiveMessage(PagedMessage message) {
           if (message.getMessage().isLargeMessage()) {
              ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
           }
    -      this.messages.add(message);
    +      while (true) {
    +         final long pIndex = producerIndex;
    +         if (pIndex != RESIZING) {
    +            if (pIndex == Integer.MAX_VALUE) {
    +               throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
    +            }
    +            //load acquire the current producer buffer
    +            final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
    +            final int pOffset = (int) (pIndex & chunkMask);
    +            //only the first message to a chunk can attempt to resize
    +            if (pOffset == 0) {
    +               if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
    +                  return;
    +               }
    +            } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
    +               //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
    +               //NOTE: producerIndex is being updated before setting a new value
    +               producerBuffer.lazySet(pOffset, message);
    +               return;
    +            }
    +         }
    +         Thread.yield();
    +      }
    +   }
    +
    +   private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
    +      if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
    +         return false;
    +      }
    +      final AtomicChunk<PagedMessage> newChunk;
    +      try {
    +         final int index = (int) (pIndex >> chunkSizeLog2);
    +         newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
    +      } catch (OutOfMemoryError oom) {
    +         //unblock producerIndex without updating it
    +         PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
    +         throw oom;
    +      }
    +      //adding the message to it
    +      newChunk.lazySet(0, message);
    +      //linking it to the old one, if any
    +      if (producerBuffer != null) {
    +         //a plain store is enough, given that producerIndex prevents any reader/writer to access it
    +         producerBuffer.next = newChunk;
    +      } else {
    +         //it's first one
    +         this.consumerBuffer = newChunk;
    +      }
    +      //making it the current produced one
    +      this.producerBuffer = newChunk;
    +      //store release any previous write and "unblock" anyone waiting resizing to finish
    +      PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
    +      return true;
        }
     
        @Override
    -   public synchronized void close() {
    +   public void close() {
           logger.tracef("Closing %s", this);
           this.isLive = false;
        }
     
    +   private static PagedMessage[] EMPTY_MSG = null;
    +
    +   private static PagedMessage[] noMessages() {
    +      //it is a benign race: no need strong initializations here
    +      PagedMessage[] empty = EMPTY_MSG;
    +      if (empty != null) {
    +         return empty;
    +      } else {
    +         empty = new PagedMessage[0];
    --- End diff --
   
    yep I have done it on purpose: is a benign race, because there is just a very low chance a new empty would be allocated several times, but I agree :+1:
    It will makes the code much easier


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568552
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    +      //size is never allowed to be > Integer.MAX_VALUE
    +      final int lastChunkIndex = (int) size >> chunkSizeLog2;
    +      int requiredJumps = jumps;
    +      AtomicChunk<PagedMessage> jumpBuffer = null;
    +      boolean jumpForward = true;
    +      int distanceFromLastChunkIndex = lastChunkIndex - jumps;
    +      //it's worth to go backward from lastChunkIndex?
    +      //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
    +      if (distanceFromLastChunkIndex < jumps) {
    +         final AtomicChunk<PagedMessage> producer = producerBuffer;
    +         //producer is a potential moving, always increasing, target ie better to re-check the distance
    +         distanceFromLastChunkIndex = producer.index - jumps;
    +         if (distanceFromLastChunkIndex < jumps) {
    +            //we're saving some jumps ie is fine to go backward from here
    +            jumpBuffer = producer;
    +            requiredJumps = distanceFromLastChunkIndex;
    +            jumpForward = false;
    +         }
    +      }
    +      //start from the consumer buffer only is needed
    +      if (jumpBuffer == null) {
    +         jumpBuffer = consumerBuffer;
    +      }
    +      for (int i = 0; i < requiredJumps; i++) {
    +         //next chunk is always set if below a read producerIndex value
    +         //previous chunk is final and can be safely read
    +         jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
    +      }
    +      return jumpBuffer;
        }
     
        @Override
    -   public synchronized boolean isLive() {
    +   public boolean isLive() {
           return isLive;
        }
     
        @Override
    -   public synchronized void addLiveMessage(PagedMessage message) {
    +   public void addLiveMessage(PagedMessage message) {
           if (message.getMessage().isLargeMessage()) {
              ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
           }
    -      this.messages.add(message);
    +      while (true) {
    +         final long pIndex = producerIndex;
    +         if (pIndex != RESIZING) {
    +            if (pIndex == Integer.MAX_VALUE) {
    +               throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
    +            }
    +            //load acquire the current producer buffer
    +            final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
    +            final int pOffset = (int) (pIndex & chunkMask);
    +            //only the first message to a chunk can attempt to resize
    +            if (pOffset == 0) {
    +               if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
    +                  return;
    +               }
    +            } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
    +               //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
    +               //NOTE: producerIndex is being updated before setting a new value
    +               producerBuffer.lazySet(pOffset, message);
    +               return;
    +            }
    +         }
    +         Thread.yield();
    +      }
    +   }
    +
    +   private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
    +      if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
    +         return false;
    +      }
    +      final AtomicChunk<PagedMessage> newChunk;
    +      try {
    +         final int index = (int) (pIndex >> chunkSizeLog2);
    +         newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
    +      } catch (OutOfMemoryError oom) {
    +         //unblock producerIndex without updating it
    +         PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
    +         throw oom;
    +      }
    +      //adding the message to it
    +      newChunk.lazySet(0, message);
    +      //linking it to the old one, if any
    +      if (producerBuffer != null) {
    +         //a plain store is enough, given that producerIndex prevents any reader/writer to access it
    +         producerBuffer.next = newChunk;
    +      } else {
    +         //it's first one
    +         this.consumerBuffer = newChunk;
    +      }
    +      //making it the current produced one
    +      this.producerBuffer = newChunk;
    +      //store release any previous write and "unblock" anyone waiting resizing to finish
    +      PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
    +      return true;
        }
     
        @Override
    -   public synchronized void close() {
    +   public void close() {
           logger.tracef("Closing %s", this);
           this.isLive = false;
    --- End diff --
   
    No needs,`isLive = false` is a volatile set exactly like using the updater


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568579
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    --- End diff --
   
    naming of fields is missaligned, some places its called size, some others you call it index, this is hard to follow.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568743
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    --- End diff --
   
    surely if this occurs, theres some issue......


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569523
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    +      //size is never allowed to be > Integer.MAX_VALUE
    +      final int lastChunkIndex = (int) size >> chunkSizeLog2;
    +      int requiredJumps = jumps;
    +      AtomicChunk<PagedMessage> jumpBuffer = null;
    +      boolean jumpForward = true;
    +      int distanceFromLastChunkIndex = lastChunkIndex - jumps;
    +      //it's worth to go backward from lastChunkIndex?
    +      //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
    +      if (distanceFromLastChunkIndex < jumps) {
    +         final AtomicChunk<PagedMessage> producer = producerBuffer;
    +         //producer is a potential moving, always increasing, target ie better to re-check the distance
    +         distanceFromLastChunkIndex = producer.index - jumps;
    +         if (distanceFromLastChunkIndex < jumps) {
    +            //we're saving some jumps ie is fine to go backward from here
    +            jumpBuffer = producer;
    +            requiredJumps = distanceFromLastChunkIndex;
    +            jumpForward = false;
    +         }
    +      }
    +      //start from the consumer buffer only is needed
    +      if (jumpBuffer == null) {
    +         jumpBuffer = consumerBuffer;
    +      }
    +      for (int i = 0; i < requiredJumps; i++) {
    +         //next chunk is always set if below a read producerIndex value
    +         //previous chunk is final and can be safely read
    +         jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
    +      }
    +      return jumpBuffer;
        }
     
        @Override
    -   public synchronized boolean isLive() {
    +   public boolean isLive() {
           return isLive;
        }
     
        @Override
    -   public synchronized void addLiveMessage(PagedMessage message) {
    +   public void addLiveMessage(PagedMessage message) {
           if (message.getMessage().isLargeMessage()) {
              ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
           }
    -      this.messages.add(message);
    +      while (true) {
    +         final long pIndex = producerIndex;
    +         if (pIndex != RESIZING) {
    +            if (pIndex == Integer.MAX_VALUE) {
    +               throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
    +            }
    +            //load acquire the current producer buffer
    +            final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
    +            final int pOffset = (int) (pIndex & chunkMask);
    +            //only the first message to a chunk can attempt to resize
    +            if (pOffset == 0) {
    +               if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
    +                  return;
    +               }
    +            } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
    +               //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
    +               //NOTE: producerIndex is being updated before setting a new value
    +               producerBuffer.lazySet(pOffset, message);
    +               return;
    +            }
    +         }
    +         Thread.yield();
    +      }
    +   }
    +
    +   private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
    +      if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
    +         return false;
    +      }
    +      final AtomicChunk<PagedMessage> newChunk;
    +      try {
    +         final int index = (int) (pIndex >> chunkSizeLog2);
    +         newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
    +      } catch (OutOfMemoryError oom) {
    +         //unblock producerIndex without updating it
    +         PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
    +         throw oom;
    +      }
    +      //adding the message to it
    +      newChunk.lazySet(0, message);
    +      //linking it to the old one, if any
    +      if (producerBuffer != null) {
    +         //a plain store is enough, given that producerIndex prevents any reader/writer to access it
    +         producerBuffer.next = newChunk;
    +      } else {
    +         //it's first one
    +         this.consumerBuffer = newChunk;
    +      }
    +      //making it the current produced one
    +      this.producerBuffer = newChunk;
    +      //store release any previous write and "unblock" anyone waiting resizing to finish
    +      PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
    +      return true;
        }
     
        @Override
    -   public synchronized void close() {
    +   public void close() {
           logger.tracef("Closing %s", this);
           this.isLive = false;
        }
     
    +   private static PagedMessage[] EMPTY_MSG = null;
    +
    +   private static PagedMessage[] noMessages() {
    +      //it is a benign race: no need strong initializations here
    +      PagedMessage[] empty = EMPTY_MSG;
    +      if (empty != null) {
    +         return empty;
    +      } else {
    +         empty = new PagedMessage[0];
    +         EMPTY_MSG = empty;
    +      }
    +      return empty;
    +   }
    +
        @Override
    -   public synchronized PagedMessage[] getMessages() {
    -      return messages.toArray(new PagedMessage[messages.size()]);
    +   public PagedMessage[] getMessages() {
    +      long currentSize;
    +      while ((currentSize = producerIndex) == RESIZING) {
    --- End diff --
   
    Seems theres sections of code duplication with getMessage(int)


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569626
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    --- End diff --
   
    I know but in the original version it was handled in that way and it covers 2 case: the collection is null and the collection hasn't enough element


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570293
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---
    @@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) {
           return info;
        }
     
    +   private void installTXCallback(final Transaction tx, final PagePosition position) {
    +      installTXCallback(tx, position, -1);
    +   }
    +
        /**
         * @param tx
         * @param position
    +    * @param persistentSize if negative it needs to be calculated on the fly
         */
    -   private void installTXCallback(final Transaction tx, final PagePosition position) {
    +   private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) {
           if (position.getRecordID() >= 0) {
              // It needs to persist, otherwise the cursor will return to the fist page position
              tx.setContainsPersistent();
           }
     
           PageCursorInfo info = getPageInfo(position);
           PageCache cache = info.getCache();
    -      long size = 0;
           if (cache != null) {
    -         size = getPersistentSize(cache.getMessage(position.getMessageNr()));
    +         final long size;
    +         if (persistentSize < 0) {
    --- End diff --
   
    surely this is checking for something like if its -1? not just that its negative which would be worrying.... if so this should be more explicit to just be checking -1, and if anything else thats negative, means illegal argument


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570552
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---
    @@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) {
           return info;
        }
     
    +   private void installTXCallback(final Transaction tx, final PagePosition position) {
    +      installTXCallback(tx, position, -1);
    +   }
    +
        /**
         * @param tx
         * @param position
    +    * @param persistentSize if negative it needs to be calculated on the fly
         */
    -   private void installTXCallback(final Transaction tx, final PagePosition position) {
    +   private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) {
           if (position.getRecordID() >= 0) {
              // It needs to persist, otherwise the cursor will return to the fist page position
              tx.setContainsPersistent();
           }
     
           PageCursorInfo info = getPageInfo(position);
           PageCache cache = info.getCache();
    -      long size = 0;
           if (cache != null) {
    -         size = getPersistentSize(cache.getMessage(position.getMessageNr()));
    +         final long size;
    +         if (persistentSize < 0) {
    --- End diff --
   
    -1 is used as a reserved value in another point to trigger the cache lookup


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246574775
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    +      //size is never allowed to be > Integer.MAX_VALUE
    +      final int lastChunkIndex = (int) size >> chunkSizeLog2;
    +      int requiredJumps = jumps;
    +      AtomicChunk<PagedMessage> jumpBuffer = null;
    +      boolean jumpForward = true;
    +      int distanceFromLastChunkIndex = lastChunkIndex - jumps;
    +      //it's worth to go backward from lastChunkIndex?
    +      //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer
    +      if (distanceFromLastChunkIndex < jumps) {
    +         final AtomicChunk<PagedMessage> producer = producerBuffer;
    +         //producer is a potential moving, always increasing, target ie better to re-check the distance
    +         distanceFromLastChunkIndex = producer.index - jumps;
    +         if (distanceFromLastChunkIndex < jumps) {
    +            //we're saving some jumps ie is fine to go backward from here
    +            jumpBuffer = producer;
    +            requiredJumps = distanceFromLastChunkIndex;
    +            jumpForward = false;
    +         }
    +      }
    +      //start from the consumer buffer only is needed
    +      if (jumpBuffer == null) {
    +         jumpBuffer = consumerBuffer;
    +      }
    +      for (int i = 0; i < requiredJumps; i++) {
    +         //next chunk is always set if below a read producerIndex value
    +         //previous chunk is final and can be safely read
    +         jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
    +      }
    +      return jumpBuffer;
        }
     
        @Override
    -   public synchronized boolean isLive() {
    +   public boolean isLive() {
           return isLive;
        }
     
        @Override
    -   public synchronized void addLiveMessage(PagedMessage message) {
    +   public void addLiveMessage(PagedMessage message) {
           if (message.getMessage().isLargeMessage()) {
              ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount();
           }
    -      this.messages.add(message);
    +      while (true) {
    +         final long pIndex = producerIndex;
    +         if (pIndex != RESIZING) {
    +            if (pIndex == Integer.MAX_VALUE) {
    +               throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages");
    +            }
    +            //load acquire the current producer buffer
    +            final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer;
    +            final int pOffset = (int) (pIndex & chunkMask);
    +            //only the first message to a chunk can attempt to resize
    +            if (pOffset == 0) {
    +               if (appendChunkAndMessage(producerBuffer, pIndex, message)) {
    +                  return;
    +               }
    +            } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) {
    +               //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic
    +               //NOTE: producerIndex is being updated before setting a new value
    +               producerBuffer.lazySet(pOffset, message);
    +               return;
    +            }
    +         }
    +         Thread.yield();
    +      }
    +   }
    +
    +   private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) {
    +      if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
    +         return false;
    +      }
    +      final AtomicChunk<PagedMessage> newChunk;
    +      try {
    +         final int index = (int) (pIndex >> chunkSizeLog2);
    +         newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
    +      } catch (OutOfMemoryError oom) {
    +         //unblock producerIndex without updating it
    +         PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
    +         throw oom;
    +      }
    +      //adding the message to it
    +      newChunk.lazySet(0, message);
    +      //linking it to the old one, if any
    +      if (producerBuffer != null) {
    +         //a plain store is enough, given that producerIndex prevents any reader/writer to access it
    +         producerBuffer.next = newChunk;
    +      } else {
    +         //it's first one
    +         this.consumerBuffer = newChunk;
    +      }
    +      //making it the current produced one
    +      this.producerBuffer = newChunk;
    +      //store release any previous write and "unblock" anyone waiting resizing to finish
    +      PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
    +      return true;
        }
     
        @Override
    -   public synchronized void close() {
    +   public void close() {
           logger.tracef("Closing %s", this);
           this.isLive = false;
        }
     
    +   private static PagedMessage[] EMPTY_MSG = null;
    +
    +   private static PagedMessage[] noMessages() {
    +      //it is a benign race: no need strong initializations here
    +      PagedMessage[] empty = EMPTY_MSG;
    +      if (empty != null) {
    +         return empty;
    +      } else {
    +         empty = new PagedMessage[0];
    +         EMPTY_MSG = empty;
    +      }
    +      return empty;
    +   }
    +
        @Override
    -   public synchronized PagedMessage[] getMessages() {
    -      return messages.toArray(new PagedMessage[messages.size()]);
    +   public PagedMessage[] getMessages() {
    +      long currentSize;
    +      while ((currentSize = producerIndex) == RESIZING) {
    --- End diff --
   
    good catch! :+1: While creating a more generic collection I will refactor the bits to avoid duplication when possible


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    --- End diff --
   
    cough ;) ... for the same comment you left me .... :P


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246578304
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    --- End diff --
   
    if this was a collection, this would almost be throwing illegal arguement.....


---
12