[activemq] branch master updated: AMQ-7353 - fix visibility of marshalledProperties to ensure competing threads don't see partial objects in error. Little test case that demonstrates the problem in isolation

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[activemq] branch master updated: AMQ-7353 - fix visibility of marshalledProperties to ensure competing threads don't see partial objects in error. Little test case that demonstrates the problem in isolation

gtully-2
This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a0c853  AMQ-7353 - fix visibility of marshalledProperties to ensure competing threads don't see partial objects in error. Little test case that demonstrates the problem in isolation
5a0c853 is described below

commit 5a0c853ba0e2ade0b29fa4319857bd8a557995e4
Author: gtully <[hidden email]>
AuthorDate: Tue Nov 26 16:58:30 2019 +0000

    AMQ-7353 - fix visibility of marshalledProperties to ensure competing threads don't see partial objects in error. Little test case that demonstrates the problem in isolation
---
 .../java/org/apache/activemq/ActiveMQSession.java  |   2 +-
 .../java/org/apache/activemq/command/Message.java  |   2 +-
 .../apache/activemq/command/VisibilityTest.java    | 214 +++++++++++++++++++++
 3 files changed, 216 insertions(+), 2 deletions(-)

diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 9b18d89..5910634 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -746,8 +746,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
 
             } finally {
                 connection.removeSession(this);
-                this.transactionContext = null;
                 closed = true;
+                this.transactionContext = null;
             }
         }
     }
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index fca3b46..65b560d 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -79,7 +79,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
     protected String userID;
 
     protected ByteSequence content;
-    protected ByteSequence marshalledProperties;
+    protected volatile ByteSequence marshalledProperties;
     protected DataStructure dataStructure;
     protected int redeliveryCounter;
 
diff --git a/activemq-client/src/test/java/org/apache/activemq/command/VisibilityTest.java b/activemq-client/src/test/java/org/apache/activemq/command/VisibilityTest.java
new file mode 100644
index 0000000..d0b362d
--- /dev/null
+++ b/activemq-client/src/test/java/org/apache/activemq/command/VisibilityTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.command;
+
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+
+// https://issues.apache.org/jira/browse/AMQ-7353
+
+public class VisibilityTest {
+
+    // seems to reproduce easily with a direct referene to bytesSequence
+    // a simpler message LocalMessage with less logic, reproduces ok.
+    // however adding more logic to org.apache.activemq.command.VisibilityTest.LocalMessage.beforeMarshall will throw it off.
+    // It could be down to cache lines, it is a brittle test - but it does demonstrate the problem in theory
+    // an allocation in one thread may not be fully visible in another even after the init has complete!
+    // I wanted to prove the need for the volatile to avoid the npe, doing the extra work when it is not visible is fine
+    // but the NPE is a real problem when it happens.
+    static ActiveMQBytesMessage bytesMessage;
+    static ByteSequence byteSequence;
+    static LocalMessage localMessage;
+
+    static class LocalMessage {
+        public HashMap<String, Object>  properties = new HashMap<>();
+        public /* the fix */ volatile ByteSequence marshalledProperties;
+        public int total;
+        public void setBooleanProperty(String name, boolean v) {
+            properties.put(name, v);
+        }
+
+        public void beforeMarshall(WireFormat ignored) throws IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            // putting the following (real work) in and it won't reproduce
+            //DataOutputStream os = new DataOutputStream(baos);
+            //MarshallingSupport.marshalPrimitiveMap(properties, os);
+            //os.close();
+            total += properties.size();
+            marshalledProperties = baos.toByteSequence();
+        }
+
+        public ByteSequence getMarshalledProperties() {
+            return marshalledProperties;
+        }
+    }
+
+    public static int checkNull() {
+        ByteSequence local = byteSequence;
+        if (local != null) {
+            // if local is non null, the internal buffer may not be visible!
+            return local.getData().length;
+        }
+        return 0;
+    }
+
+    public static int checkNullReference() {
+        ActiveMQBytesMessage message = bytesMessage;
+        if (message != null) {
+            ByteSequence local = message.getMarshalledProperties();
+            if (local != null) {
+                // if local is non null, the internal buffer may not be visible!
+                return local.getData().length;
+            }
+        }
+        return 0;
+    }
+
+    public static int checkNullReferenceOnLocalMessage() {
+        LocalMessage message = localMessage;
+        if (message != null) {
+            ByteSequence local = message.getMarshalledProperties();
+            if (local != null) {
+                // if local is non null, the internal buffer may not be visible!
+                return local.getData().length;
+            }
+        }
+        return 0;
+    }
+
+    @Ignore
+    public void doTestNested() throws Exception {
+        final AtomicBoolean gotError = new AtomicBoolean();
+        final Thread tryingToMarshall = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                long total = 0;
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        total += checkNullReference();
+                    } catch (Throwable t) {
+                        t.printStackTrace();
+                        gotError.set(true);
+                    }
+                }
+                System.out.println("from other thread " + total);
+            }
+        });
+        long len = 0;
+        tryingToMarshall.start();
+        for (int t = 0; t < 10; t++) {
+            for (int i = 0; i < 1000_000; i++) {
+                // real world
+                ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+                // needs non null properties to init marshalledProperties
+                message.setBooleanProperty("B", true);
+                message.beforeMarshall(null);
+                bytesMessage = message;
+                // local access after publish
+                len += message.getMarshalledProperties().getData().length;
+            }
+        }
+        tryingToMarshall.interrupt();
+        tryingToMarshall.join();
+        System.out.println(len);
+        assertFalse("no errors, no npe!", gotError.get());
+    }
+
+
+    @Test
+    public void doTestNestedLocalMessage() throws Exception {
+        final AtomicBoolean gotError = new AtomicBoolean();
+        final Thread tryingToMarshall = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                long total = 0;
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        total += checkNullReferenceOnLocalMessage();
+                    } catch (Throwable t) {
+                        t.printStackTrace();
+                        gotError.set(true);
+                    }
+                }
+                System.out.println("from other thread " + total);
+            }
+        });
+        long len = 0;
+        tryingToMarshall.start();
+        for (int t = 0; t < 10; t++) {
+            for (int i = 0; i < 1000_000; i++) {
+                // real world
+                LocalMessage message = new LocalMessage();
+                // needs non null properties to init marshalledProperties
+                message.setBooleanProperty("B", true);
+                message.beforeMarshall(null);
+                localMessage = message;
+                // local access after publish
+                len += message.getMarshalledProperties().getData().length;
+            }
+        }
+        tryingToMarshall.interrupt();
+        tryingToMarshall.join();
+        System.out.println(len);
+        assertFalse("no errors, no npe!", gotError.get());
+    }
+
+    @Ignore
+    public void doTestDirect() throws Exception {
+        final AtomicBoolean gotError = new AtomicBoolean();
+        final Thread tryingToMarshall = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                long total = 0;
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        total += checkNull();
+                    } catch (Throwable t) {
+                        t.printStackTrace();
+                        gotError.set(true);
+                    }
+                }
+                System.out.println("from other thread " + total);
+            }
+        });
+        long len = 0;
+        tryingToMarshall.start();
+        for (int t = 0; t < 10; t++) {
+            for (int i = 0; i < 1000_000; i++) {
+                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+                byteSequence = byteArrayOutputStream.toByteSequence();
+                // local access after publish
+                len += byteSequence.getData().length;
+            }
+        }
+        tryingToMarshall.interrupt();
+        tryingToMarshall.join();
+        System.out.println(len);
+        assertFalse("no errors, no npe!", gotError.get());
+    }
+
+}
\ No newline at end of file