[GitHub] activemq-artemis pull request #1629: ARTEMIS-1486 Core client should be noti...

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1629: ARTEMIS-1486 Core client should be noti...

clebertsuconic-3
GitHub user stanlyDoge opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1629

    ARTEMIS-1486 Core client should be notified if consumer is closed on …

    …broker side
   
    If consumer is closed on broker using e.g. Hawtio console, client connected as that consumer (representation of broker resource) should be notified about that fact and react to that. It doesn't seem to react. If consumer is closed, as a result of not being notified, client hangs in the air and cannot receive messages.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/stanlyDoge/activemq-artemis-1 ARTEMIS-1486

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1629.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1629
   
----
commit e8bc60ee003213dd90369cf6786190ae1dc24326
Author: Stanislav Knot <[hidden email]>
Date:   2017-10-31T13:08:23Z

    ARTEMIS-1486 Core client should be notified if consumer is closed on broker side

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1629: ARTEMIS-1486 Core client should be noti...

clebertsuconic-3
Github user mtaylor commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1629#discussion_r148502455
 
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---
    @@ -1621,6 +1621,7 @@ public boolean closeConsumerWithID(final String sessionID, final String ID) thro
                    for (ServerConsumer serverConsumer : serverConsumers) {
                       if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
                          serverConsumer.close(true);
    +                     serverConsumer.disconnect();
                          return true;
    --- End diff --
   
    Something to note here.  When the server calls disconnect, it sends a DisconnectConsumer packet to the client.  The client processes this packet (cleans up some local state) then sends a CloseConsumer packet to the broker.  Since the broker has already closed the consumer it will log an error message.
   
    Since this is a "ForceClose" style operation, I think it's OK just to document that this may happen.  I think an improvement on this is to update the client to only send the close consumer packet if the close is called from the client API and not via the disconnect consumer packet.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1629: ARTEMIS-1486 Core client should be noti...

clebertsuconic-3
In reply to this post by clebertsuconic-3
Github user mtaylor commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1629#discussion_r148502940
 
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java ---
    @@ -1978,6 +1980,36 @@ public void testConnectorServiceManagement() throws Exception {
           Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
        }
     
    +   @Test
    +   public void testCloseConsumer() throws Exception {
    +      SimpleString address = RandomUtil.randomSimpleString();
    +      SimpleString name = RandomUtil.randomSimpleString();
    +      boolean durable = true;
    +
    +      ActiveMQServerControl serverControl = createManagementControl();
    +
    +      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
    +      serverControl.createAddress(address.toString(), "ANYCAST");
    +      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
    +
    +      ServerLocator receiveLocator = createInVMNonHALocator();
    +      ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
    +      ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
    +      final ClientConsumer consumer = receiveClientSession.createConsumer(name);
    +      final ClientProducer producer = receiveClientSession.createProducer(name);
    +
    +      ServerSession ss = server.getSessions().iterator().next();
    +      ServerConsumer sc = ss.getServerConsumers().iterator().next();
    +
    +      producer.send(receiveClientSession.createMessage(true));
    +      consumer.receive(1000);
    +
    +      Assert.assertFalse(consumer.isClosed());
    +      serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(), Long.toString(sc.sequentialID()));
    +      Wait.waitFor(() -> consumer.isClosed(), 1000, 100);
    +      Assert.assertTrue(consumer.isClosed());
    +   }
    +
        protected void scaleDown(ScaleDownHandler handler) throws Exception {
    --- End diff --
   
    Nice test.  I realise this is outside the scope of this JIRA, but some more testing around other protocols would be useful, in addition it'd be good to test that this works with the Artemis JMS Client since it uses CORE under the covers.  We can do these in follow up commits.
   
    +1 overall.  Merging


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1629: ARTEMIS-1486 Core client should be noti...

clebertsuconic-3
In reply to this post by clebertsuconic-3
Github user asfgit closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/1629


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1629: ARTEMIS-1486 Core client should be noti...

clebertsuconic-3
In reply to this post by clebertsuconic-3
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1629#discussion_r148523746
 
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java ---
    @@ -1978,6 +1980,36 @@ public void testConnectorServiceManagement() throws Exception {
           Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
        }
     
    +   @Test
    +   public void testCloseConsumer() throws Exception {
    +      SimpleString address = RandomUtil.randomSimpleString();
    +      SimpleString name = RandomUtil.randomSimpleString();
    +      boolean durable = true;
    +
    +      ActiveMQServerControl serverControl = createManagementControl();
    +
    +      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
    +      serverControl.createAddress(address.toString(), "ANYCAST");
    +      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
    +
    +      ServerLocator receiveLocator = createInVMNonHALocator();
    +      ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
    +      ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
    +      final ClientConsumer consumer = receiveClientSession.createConsumer(name);
    +      final ClientProducer producer = receiveClientSession.createProducer(name);
    +
    +      ServerSession ss = server.getSessions().iterator().next();
    +      ServerConsumer sc = ss.getServerConsumers().iterator().next();
    +
    +      producer.send(receiveClientSession.createMessage(true));
    +      consumer.receive(1000);
    +
    +      Assert.assertFalse(consumer.isClosed());
    +      serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(), Long.toString(sc.sequentialID()));
    +      Wait.waitFor(() -> consumer.isClosed(), 1000, 100);
    --- End diff --
   
    With my pull request on Wait, you would been able to do Wait.assertTrue(consumer::isClosed)


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] activemq-artemis pull request #1629: ARTEMIS-1486 Core client should be noti...

clebertsuconic-3
In reply to this post by clebertsuconic-3
Github user stanlyDoge commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1629#discussion_r148531299
 
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java ---
    @@ -1978,6 +1980,36 @@ public void testConnectorServiceManagement() throws Exception {
           Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
        }
     
    +   @Test
    +   public void testCloseConsumer() throws Exception {
    +      SimpleString address = RandomUtil.randomSimpleString();
    +      SimpleString name = RandomUtil.randomSimpleString();
    +      boolean durable = true;
    +
    +      ActiveMQServerControl serverControl = createManagementControl();
    +
    +      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
    +      serverControl.createAddress(address.toString(), "ANYCAST");
    +      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
    +
    +      ServerLocator receiveLocator = createInVMNonHALocator();
    +      ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
    +      ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
    +      final ClientConsumer consumer = receiveClientSession.createConsumer(name);
    +      final ClientProducer producer = receiveClientSession.createProducer(name);
    +
    +      ServerSession ss = server.getSessions().iterator().next();
    +      ServerConsumer sc = ss.getServerConsumers().iterator().next();
    +
    +      producer.send(receiveClientSession.createMessage(true));
    +      consumer.receive(1000);
    +
    +      Assert.assertFalse(consumer.isClosed());
    +      serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(), Long.toString(sc.sequentialID()));
    +      Wait.waitFor(() -> consumer.isClosed(), 1000, 100);
    --- End diff --
   
    ah, thanks


---