[GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor...

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor...

asfgit
GitHub user shoukunhuai opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1742

    ARTEMIS-1570 Flush appendExecutor before take journal snapshot

    When live start replication, it must make sure there is
    no pending write in message & bindings journal, or we may
    lost journal records during initial replication.
   
    So we need flush append executor after acquire StorageManager's
    write lock, before Journal's write lock.
    Also we set a 10 seconds timeout when flush, the same as
    Journal::flushExecutor. If we failed to flush in 10 seconds,
    we abort replication, backup will try again later.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shoukunhuai/activemq-artemis flush-journal-executor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1742.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1742
   
----
commit 3d4c45925f2fe274579b262cedd63103ef29cb4e
Author: shoukun <shoukunhuai@...>
Date:   2017-12-27T02:23:33Z

    Flush appendExecutor before take journal snapshot
   
    When live start replication, it must make sure there is
    no pending write in message & bindings journal, or we may
    lost journal records during initial replication.
   
    So we need flush append executor after acquire StorageManager's
    write lock, before Journal's write lock.
    Also we set a 10 seconds timeout when flush, the same as
    Journal::flushExecutor. If we failed to flush in 10 seconds,
    we abort replication, backup will try again later.

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    How did u find this ? Nice job.
   
    I will see if I can change the logic to not need the wait.
   
   
    Any tests you can share.  
   
   
    I’m out this week for the Xmas break. Will take a look on next week. Ok ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user shoukunhuai commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    @clebertsuconic That will be great if we can avoid wait.
    Please do not merge, i will push my test.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    @shoukunhuai I thought about developing a byteman test with some rules on stopping during the execution, and then releasing it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user shoukunhuai commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    @clebertsuconic just pushed my test for your reference.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    @shoukunhuai looks like some check-style bits need tidying up. looking at the PR build failure.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user jbertram commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    @clebertsuconic, what's the status of this?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    I didn't have the time for it yet.. still working on compatibility issues.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor...

asfgit
In reply to this post by asfgit
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1742#discussion_r162418130
 
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java ---
    @@ -0,0 +1,388 @@
    +package org.apache.activemq.artemis.tests.integration.replication;
    +
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
    +import org.apache.activemq.artemis.api.core.ActiveMQException;
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.RoutingType;
    +import org.apache.activemq.artemis.api.core.client.*;
    +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
    +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
    +import org.apache.activemq.artemis.core.config.Configuration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
    +import org.apache.activemq.artemis.core.io.SequentialFileFactory;
    +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
    +import org.apache.activemq.artemis.core.journal.LoaderCallback;
    +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
    +import org.apache.activemq.artemis.core.journal.RecordInfo;
    +import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessage;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
    +import org.apache.activemq.artemis.core.persistence.Persister;
    +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
    +import org.apache.activemq.artemis.core.server.ActiveMQServer;
    +import org.apache.activemq.artemis.core.server.ActiveMQServers;
    +import org.apache.activemq.artemis.core.server.JournalType;
    +import org.apache.activemq.artemis.junit.Wait;
    +import org.jboss.logging.Logger;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public class SharedNothingReplicationTest {
    +    private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class);
    +
    +    @Rule
    +    public TemporaryFolder brokersFolder = new TemporaryFolder();
    +
    +    private SlowMessagePersister slowMessagePersister;
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        slowMessagePersister = new SlowMessagePersister();
    +        CoreMessagePersister.theInstance = slowMessagePersister;
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        if (slowMessagePersister != null) {
    +            CoreMessagePersister.theInstance = slowMessagePersister.persister;
    +        }
    +    }
    +
    +    @Test
    +    public void testReplicateFromSlowLive() throws Exception {
    +        // start live
    +        Configuration liveConfiguration = createLiveConfiguration();
    +        ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration);
    +        liveServer.start();
    +
    +        Wait.waitFor(() -> liveServer.isStarted());
    +
    +        CoreMessagePersister.theInstance = SlowMessagePersister._getInstance();
    +
    +        final CountDownLatch replicated = new CountDownLatch(1);
    +
    +        ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
    +        locator.setCallTimeout(60_000L);
    +        locator.setConnectionTTL(60_000L);
    +        locator.addClusterTopologyListener(new ClusterTopologyListener() {
    +            @Override
    +            public void nodeUP(TopologyMember member, boolean last) {
    +                logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(), member.getBackup());
    +                if (member.getBackup() != null) {
    +                    replicated.countDown();
    +                }
    +            }
    +
    +            @Override
    +            public void nodeDown(long eventUID, String nodeID) {
    +
    +            }
    +        });
    +
    +        final ClientSessionFactory csf = locator.createSessionFactory();
    +        ClientSession sess = csf.createSession();
    +        sess.createQueue("slow", RoutingType.ANYCAST, "slow", true);
    +        sess.close();
    +        Executor sendMessageExecutor = Executors.newCachedThreadPool();
    +
    +
    +        // let's write some messages
    +        int i = 0;
    +        final int j = 50;
    +        final CountDownLatch allMessageSent = new CountDownLatch(j);
    +        while (i < 5) {
    +            sendMessageExecutor.execute(() -> {
    +                try {
    +                    ClientSession session = csf.createSession(true, true);
    +                    ClientProducer producer = session.createProducer("slow");
    +                    ClientMessage message = session.createMessage(true);
    +                    // this will make journal's append executor busy
    +                    message.putLongProperty("delay", Long.getLong("message.property.delay",500L));
    --- End diff --
   
    Where? how this property is affecting the semantic of the server. I didn't find it anywhere.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    nice job on this one!!! impressed!!!


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    do you want to contribute more????????? I can help you in any way you need.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor...

asfgit
In reply to this post by asfgit
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1742#discussion_r162421523
 
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java ---
    @@ -0,0 +1,388 @@
    +package org.apache.activemq.artemis.tests.integration.replication;
    +
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
    +import org.apache.activemq.artemis.api.core.ActiveMQException;
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.RoutingType;
    +import org.apache.activemq.artemis.api.core.client.*;
    +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
    +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
    +import org.apache.activemq.artemis.core.config.Configuration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
    +import org.apache.activemq.artemis.core.io.SequentialFileFactory;
    +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
    +import org.apache.activemq.artemis.core.journal.LoaderCallback;
    +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
    +import org.apache.activemq.artemis.core.journal.RecordInfo;
    +import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessage;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
    +import org.apache.activemq.artemis.core.persistence.Persister;
    +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
    +import org.apache.activemq.artemis.core.server.ActiveMQServer;
    +import org.apache.activemq.artemis.core.server.ActiveMQServers;
    +import org.apache.activemq.artemis.core.server.JournalType;
    +import org.apache.activemq.artemis.junit.Wait;
    +import org.jboss.logging.Logger;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public class SharedNothingReplicationTest {
    +    private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class);
    +
    +    @Rule
    +    public TemporaryFolder brokersFolder = new TemporaryFolder();
    +
    +    private SlowMessagePersister slowMessagePersister;
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        slowMessagePersister = new SlowMessagePersister();
    +        CoreMessagePersister.theInstance = slowMessagePersister;
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        if (slowMessagePersister != null) {
    +            CoreMessagePersister.theInstance = slowMessagePersister.persister;
    +        }
    +    }
    +
    +    @Test
    +    public void testReplicateFromSlowLive() throws Exception {
    +        // start live
    +        Configuration liveConfiguration = createLiveConfiguration();
    +        ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration);
    +        liveServer.start();
    +
    +        Wait.waitFor(() -> liveServer.isStarted());
    +
    +        CoreMessagePersister.theInstance = SlowMessagePersister._getInstance();
    +
    +        final CountDownLatch replicated = new CountDownLatch(1);
    +
    +        ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
    +        locator.setCallTimeout(60_000L);
    +        locator.setConnectionTTL(60_000L);
    +        locator.addClusterTopologyListener(new ClusterTopologyListener() {
    +            @Override
    +            public void nodeUP(TopologyMember member, boolean last) {
    +                logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(), member.getBackup());
    +                if (member.getBackup() != null) {
    +                    replicated.countDown();
    +                }
    +            }
    +
    +            @Override
    +            public void nodeDown(long eventUID, String nodeID) {
    +
    +            }
    +        });
    +
    +        final ClientSessionFactory csf = locator.createSessionFactory();
    +        ClientSession sess = csf.createSession();
    +        sess.createQueue("slow", RoutingType.ANYCAST, "slow", true);
    +        sess.close();
    +        Executor sendMessageExecutor = Executors.newCachedThreadPool();
    +
    +
    +        // let's write some messages
    +        int i = 0;
    +        final int j = 50;
    +        final CountDownLatch allMessageSent = new CountDownLatch(j);
    +        while (i < 5) {
    +            sendMessageExecutor.execute(() -> {
    +                try {
    +                    ClientSession session = csf.createSession(true, true);
    +                    ClientProducer producer = session.createProducer("slow");
    +                    ClientMessage message = session.createMessage(true);
    +                    // this will make journal's append executor busy
    +                    message.putLongProperty("delay", Long.getLong("message.property.delay",500L));
    --- End diff --
   
    Never mind.. I see.. it's on your extended persister...
   
   
    I don't think we need a system property.. the test can just keep it constant at 500L


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor...

asfgit
In reply to this post by asfgit
Github user asfgit closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/1742


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis issue #1742: ARTEMIS-1570 Flush appendExecutor before take ...

asfgit
In reply to this post by asfgit
Github user shoukunhuai commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1742
 
    @clebertsuconic sorry, almost forgot this, being busy on something else.
    Looks like this issue was resolved?


---