Authorization plugin - don't know what to do with client attempting to write to weird advisory topic

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Authorization plugin - don't know what to do with client attempting to write to weird advisory topic

Jędrzej Dudkiewicz
I wrote yet another certificate-based authentication/authorization
plugin. It reads CN from certificate provided by the client and
depending on what is in there it allows or dissalows
creation/deletion/writing to topics/queues. To test it I wrote a
simple Java client that uses OpenWire. Connection is made
successfully, CN is read and permissions are added are expected.
Unfortunately after connection is created (addConnection in plugin is
called), destination "topic://ActiveMQ.Advisory.Connection" is created
by the client, or rather in its context (ok, unexpected, but it makes
sense as I used advisory topics), method addConsumer() (from
BrokerFilter) is called, and its destination is:

DEST: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
IS TOPIC: true
PHYSNAME: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
DEST AS STR: Topic
QUALIFIED NAME:
topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
DEST PATHS: topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
IS TEMP: false

This means that it is a topic, its physical name is equal to qualified
name, which makes no sense whatsoever, what more this name is made up
from two topics names, it isn't temporary (not what name suggests) and
to add insult to the injury its "getDestinationPaths()" method returns
array with the following elements:
"topic://ActiveMQ"
"Advisory"
"TempQueue,topic://ActiveMQ"
"Advisory"
"TempTopic"

I wrote my plugin using
"org.apache.activemq.security.AuthorizationBroker" as a base and aside
from this and single problem with MQTT protocol it works fine.

What is the reason for this topic's existence?
Why is its name so weird?
Should I expect more destinations with such names?
Is it enough to check for exactly this name and I will be done? I did
it and it works, but I can't be sure that it won't happen again with
different weird topic name.

Thanks in advance,
--
Jędrzej Dudkiewicz

I really hate this damn machine, I wish that they would sell it.
It never does just what I want, but only what I tell it.
Reply | Threaded
Open this post in threaded view
|

Re: Authorization plugin - don't know what to do with client attempting to write to weird advisory topic

christopher.l.shannon
These are advisory messages, I suggest you read
https://activemq.apache.org/advisory-message.html which will explain what
they are and how to configure/disable them.

On Tue, Sep 24, 2019 at 4:52 PM Jędrzej Dudkiewicz <
[hidden email]> wrote:

> I wrote yet another certificate-based authentication/authorization
> plugin. It reads CN from certificate provided by the client and
> depending on what is in there it allows or dissalows
> creation/deletion/writing to topics/queues. To test it I wrote a
> simple Java client that uses OpenWire. Connection is made
> successfully, CN is read and permissions are added are expected.
> Unfortunately after connection is created (addConnection in plugin is
> called), destination "topic://ActiveMQ.Advisory.Connection" is created
> by the client, or rather in its context (ok, unexpected, but it makes
> sense as I used advisory topics), method addConsumer() (from
> BrokerFilter) is called, and its destination is:
>
> DEST:
> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> IS TOPIC: true
> PHYSNAME:
> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> DEST AS STR: Topic
> QUALIFIED NAME:
> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> DEST PATHS:
> topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> IS TEMP: false
>
> This means that it is a topic, its physical name is equal to qualified
> name, which makes no sense whatsoever, what more this name is made up
> from two topics names, it isn't temporary (not what name suggests) and
> to add insult to the injury its "getDestinationPaths()" method returns
> array with the following elements:
> "topic://ActiveMQ"
> "Advisory"
> "TempQueue,topic://ActiveMQ"
> "Advisory"
> "TempTopic"
>
> I wrote my plugin using
> "org.apache.activemq.security.AuthorizationBroker" as a base and aside
> from this and single problem with MQTT protocol it works fine.
>
> What is the reason for this topic's existence?
> Why is its name so weird?
> Should I expect more destinations with such names?
> Is it enough to check for exactly this name and I will be done? I did
> it and it works, but I can't be sure that it won't happen again with
> different weird topic name.
>
> Thanks in advance,
> --
> Jędrzej Dudkiewicz
>
> I really hate this damn machine, I wish that they would sell it.
> It never does just what I want, but only what I tell it.
>
Reply | Threaded
Open this post in threaded view
|

Re: Authorization plugin - don't know what to do with client attempting to write to weird advisory topic

Tim Bain
Chris,

The problem he's pointing out is that the destination name is the
comma-separated concatenation of two advisory topic names. It feels like
there's a missing String.split() call earlier in the code path.

I haven't had time to go looking for where that might be, and won't for
at least a week, though if someone else has time to do it, that would
be great. Jędrzej, could you please capture a stack trace at the point
where you're seeing this behavior, to help whoever investigates this?

Tim

On Thu, Sep 26, 2019, 4:26 AM Christopher Shannon <
[hidden email]> wrote:

> These are advisory messages, I suggest you read
> https://activemq.apache.org/advisory-message.html which will explain what
> they are and how to configure/disable them.
>
> On Tue, Sep 24, 2019 at 4:52 PM Jędrzej Dudkiewicz <
> [hidden email]> wrote:
>
> > I wrote yet another certificate-based authentication/authorization
> > plugin. It reads CN from certificate provided by the client and
> > depending on what is in there it allows or dissalows
> > creation/deletion/writing to topics/queues. To test it I wrote a
> > simple Java client that uses OpenWire. Connection is made
> > successfully, CN is read and permissions are added are expected.
> > Unfortunately after connection is created (addConnection in plugin is
> > called), destination "topic://ActiveMQ.Advisory.Connection" is created
> > by the client, or rather in its context (ok, unexpected, but it makes
> > sense as I used advisory topics), method addConsumer() (from
> > BrokerFilter) is called, and its destination is:
> >
> > DEST:
> > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > IS TOPIC: true
> > PHYSNAME:
> > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > DEST AS STR: Topic
> > QUALIFIED NAME:
> > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > DEST PATHS:
> > topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> > IS TEMP: false
> >
> > This means that it is a topic, its physical name is equal to qualified
> > name, which makes no sense whatsoever, what more this name is made up
> > from two topics names, it isn't temporary (not what name suggests) and
> > to add insult to the injury its "getDestinationPaths()" method returns
> > array with the following elements:
> > "topic://ActiveMQ"
> > "Advisory"
> > "TempQueue,topic://ActiveMQ"
> > "Advisory"
> > "TempTopic"
> >
> > I wrote my plugin using
> > "org.apache.activemq.security.AuthorizationBroker" as a base and aside
> > from this and single problem with MQTT protocol it works fine.
> >
> > What is the reason for this topic's existence?
> > Why is its name so weird?
> > Should I expect more destinations with such names?
> > Is it enough to check for exactly this name and I will be done? I did
> > it and it works, but I can't be sure that it won't happen again with
> > different weird topic name.
> >
> > Thanks in advance,
> > --
> > Jędrzej Dudkiewicz
> >
> > I really hate this damn machine, I wish that they would sell it.
> > It never does just what I want, but only what I tell it.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Authorization plugin - don't know what to do with client attempting to write to weird advisory topic

Jędrzej Dudkiewicz
On Thu, Sep 26, 2019 at 3:01 PM Tim Bain <[hidden email]> wrote:
>
> Chris,
>
> The problem he's pointing out is that the destination name is the
> comma-separated concatenation of two advisory topic names. It feels like
> there's a missing String.split() call earlier in the code path.

Precisely. I'm aware of advisory messages and in fact I want to use
them and all would be fine if it weren't for those pesky unsplit
names.

> I haven't had time to go looking for where that might be, and won't for
> at least a week, though if someone else has time to do it, that would
> be great. Jędrzej, could you please capture a stack trace at the point
> where you're seeing this behavior, to help whoever investigates this?

Below is log (TRACE level, from "bin/activemq console") resulting from
connecting using client below. Only part after everything settled is
included:

DEBUG | Checkpoint started.
DEBUG | Checkpoint done.
DEBUG | Checkpoint started.
TRACE | Last update: 1:60346, full gc candidates set: [1]
TRACE | gc candidates after producerSequenceIdTrackerLocation:1:59943, []
TRACE | gc candidates after ackMessageFileMapLocation:1:60346, []
TRACE | gc candidates after in progress tx range:[null, null], []
TRACE | gc candidates: []
TRACE | ackMessageFileMap: {}
TRACE | Not yet time to check for compaction: 1 of 10 cycles
DEBUG | Checkpoint done.
TRACE | Execute[ActiveMQ BrokerService[localhost] Task] runnable:
org.apache.activemq.broker.TransportConnector$1$1@4629be82
TRACE | Created thread[ActiveMQ BrokerService[localhost] Task-1]:
Thread[ActiveMQ BrokerService[localhost] Task-1,5,main]
TRACE | Starting connection check task for: ssl:///127.0.0.1:38422
TRACE | TCP consumer thread for ssl:///127.0.0.1:38422 starting
DEBUG | Sending: WireFormatInfo { version=12,
properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true,
PlatformDetails=Java, CacheEnabled=true, TightEncodingEnabled=true,
MaxFrameSize=104857600, MaxInactivityDuration=30000,
MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.10},
magic=[A,c,t,i,v,e,M,Q]}
TRACE | Created thread[ActiveMQ BrokerService[localhost] Task-2]:
Thread[ActiveMQ BrokerService[localhost] Task-2,5,main]
TRACE | Stopping connection check task for: ssl:///127.0.0.1:38422
TRACE | Running task iteration 0 - Transport Connection to:
tcp://127.0.0.1:38422
DEBUG | Using min of local: WireFormatInfo { version=12,
properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true,
PlatformDetails=Java, CacheEnabled=true, TightEncodingEnabled=true,
MaxFrameSize=104857600, MaxInactivityDuration=30000,
MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.10},
magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=3,
properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true,
TightEncodingEnabled=true, MaxInactivityDuration=30000,
MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
DEBUG | Received WireFormat: WireFormatInfo { version=3,
properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true,
TightEncodingEnabled=true, MaxInactivityDuration=30000,
MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
DEBUG | ssl:///127.0.0.1:38422 before negotiation:
OpenWireFormat{version=12, cacheEnabled=false,
stackTraceEnabled=false, tightEncodingEnabled=false,
sizePrefixDisabled=false, maxFrameSize=104857600}
DEBUG | ssl:///127.0.0.1:38422 after negotiation:
OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true,
tightEncodingEnabled=true, sizePrefixDisabled=false,
maxFrameSize=104857600}
TRACE | Running task iteration 1 - Transport Connection to:
tcp://127.0.0.1:38422
TRACE | Run task done: Transport Connection to: tcp://127.0.0.1:38422
DEBUG | Setting up new connection id:
ID:gojira-34271-1569506356763-0:0, address: tcp://127.0.0.1:38422,
info: ConnectionInfo {commandId = 1, responseRequired = true,
connectionId = ID:gojira-34271-1569506356763-0:0, clientId = abc,
clientIp = null, userName = null, password = *****, brokerPath = null,
brokerMasterConnector = false, manageable = true, clientMaster = true,
faultTolerant = false, failoverReconnect = false}
DEBUG | add conn: abc/tcp://127.0.0.1:38422/null
 INFO | Allowing client:
zs.node.service.P-00666/7a1e7236-dfb7-11e9-ae1a-47ac10dfc31e
DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
DEBUG | add destination: abc/topic://ActiveMQ.Advisory.Connection
DEBUG | localhost adding destination: topic://ActiveMQ.Advisory.Connection
TRACE | Running task iteration 0 - Transport Connection to:
tcp://127.0.0.1:38422
TRACE | Running task iteration 1 - Transport Connection to:
tcp://127.0.0.1:38422
TRACE | Run task done: Transport Connection to: tcp://127.0.0.1:38422
WARN | CHECK CAN READ: zs.node.service.P-00666
DEST: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
IS TOPIC: true
PHYSNAME: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
DEST AS STR: Topic
QUALIFIED NAME:
topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
DEST PATHS: topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
IS TEMP: false
java.lang.RuntimeException: got concatenated destination physical name
    at zs.comm.amq.auth.CertAuthPlugin.checkCanRead(CertAuthPlugin.java:156)
    at zs.comm.amq.auth.CertAuthPlugin.addConsumer(CertAuthPlugin.java:46)
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
    at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)
    at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)
DEBUG | Error occured while processing sync command: ConsumerInfo
{commandId = 2, responseRequired = true, consumerId =
ID:gojira-34271-1569506356763-0:0:-1:1, destination =
topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic,
prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false,
dispatchAsync = false, selector = null, clientId = null,
subscriptionName = null, noLocal = true, exclusive = false,
retroactive = false, priority = 0, brokerPath = null,
optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate
= null, networkConsumerIds = null}, exception:
java.lang.RuntimeException: got concatenated destination physical name
java.lang.RuntimeException: got concatenated destination physical name
    at zs.comm.amq.auth.CertAuthPlugin.checkCanRead(CertAuthPlugin.java:156)[auth-0.0.1-SNAPSHOT.jar:]
    at zs.comm.amq.auth.CertAuthPlugin.addConsumer(CertAuthPlugin.java:46)[auth-0.0.1-SNAPSHOT.jar:]
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)[activemq-client-5.15.10.jar:5.15.10]
    at java.lang.Thread.run(Thread.java:748)[:1.8.0_222]
TRACE | Shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@55228189[Running, pool size =
0, active threads = 0, queued tasks = 0, completed tasks = 0] with
await termination: 10000 millis
DEBUG | Shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@55228189[Terminated, pool size
= 0, active threads = 0, queued tasks = 0, completed tasks = 0] is
shutdown: true and terminated: true took: 0.002 seconds.
DEBUG | Transport Connection to: tcp://127.0.0.1:38422 failed:
java.io.EOFException
java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)[:1.8.0_222]
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)[activemq-client-5.15.10.jar:5.15.10]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)[activemq-client-5.15.10.jar:5.15.10]
    at java.lang.Thread.run(Thread.java:748)[:1.8.0_222]
TRACE | Execute[ActiveMQ BrokerService[localhost] Task] runnable:
org.apache.activemq.broker.TransportConnection$4@7e75966d
DEBUG | Unregistering MBean
org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire,connectionViewType=clientId,connectionName=abc
DEBUG | Unregistering MBean
org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire,connectionViewType=remoteAddress,connectionName=tcp_//127.0.0.1_38422
DEBUG | Stopping connection: tcp://127.0.0.1:38422
DEBUG | Stopping transport ssl:///127.0.0.1:38422
DEBUG | Initialized TaskRunnerFactory[ActiveMQ Task] using
ExecutorService:
java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Running, pool size =
0, active threads = 0, queued tasks = 0, completed tasks = 0]
TRACE | Execute[ActiveMQ Task] runnable:
org.apache.activemq.transport.tcp.TcpTransport$1@78a078c4
TRACE | Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main]
TRACE | Closing socket 40b115e[TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384:
Socket[addr=/127.0.0.1,port=38422,localport=61616]]
DEBUG | Closed socket 40b115e[TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384:
Socket[addr=/127.0.0.1,port=38422,localport=61616]]
DEBUG | Forcing shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Running, pool size =
1, active threads = 0, queued tasks = 0, completed tasks = 1]
TRACE | Shutdown of ExecutorService:
java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Shutting down, pool
size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
is shutdown: true and terminated: false.
DEBUG | Stopped transport: tcp://127.0.0.1:38422
TRACE | Shutdown timeout: 1 task: Transport Connection to: tcp://127.0.0.1:38422
DEBUG | Cleaning up connection resources: tcp://127.0.0.1:38422
DEBUG | remove connection id: ID:gojira-34271-1569506356763-0:0
DEBUG | Remove connection: abc/java.io.EOFException
DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
DEBUG | add destination: abc/topic://ActiveMQ.Advisory.Connection
DEBUG | Connection Stopped: tcp://127.0.0.1:38422
DEBUG | Checkpoint started.
DEBUG | Checkpoint done.
^C INFO | Apache ActiveMQ 5.15.10 (localhost,
ID:gojira-40629-1569506325958-0:1) is shutting down
DEBUG | Caught exception, must be shutting down. This exception is ignored.
java.lang.IllegalStateException: Shutdown in progress
    at java.lang.ApplicationShutdownHooks.remove(ApplicationShutdownHooks.java:82)[:1.8.0_222]
    at java.lang.Runtime.removeShutdownHook(Runtime.java:239)[:1.8.0_222]
    at org.apache.activemq.broker.BrokerService.removeShutdownHook(BrokerService.java:2551)[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.BrokerService.stop(BrokerService.java:838)[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.xbean.XBeanBrokerService.stop(XBeanBrokerService.java:122)[activemq-spring-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.BrokerService.containerShutdown(BrokerService.java:2574)[activemq-broker-5.15.10.jar:5.15.10]
    at org.apache.activemq.broker.BrokerService$7.run(BrokerService.java:2541)[activemq-broker-5.15.10.jar:5.15.10]
DEBUG | Unregistering MBean
org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire



There is more, but it relates to broker shutting down.

Below is code of Java client used:


**************** FILE App.java *******************
package jd.test.jmstest;

import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.SecureRandom;

import javax.jms.Session;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

public class App {
    public static void main(String[] args) throws Throwable {
        System.out.println("Hello World!");
        ActiveMQSslConnectionFactory f = new ActiveMQSslConnectionFactory();
        f.setBrokerURL("ssl://localhost:61616");
        f.setClientID("abc");
        f.setKeyAndTrustManagers(getKeyManager(), getTrustManager(),
new SecureRandom());
        ActiveMQConnection conn = (ActiveMQConnection) f.createConnection();
        System.out.println(conn.getClientID());
        System.out.println(conn.getClass());
        ActiveMQSession s = (ActiveMQSession)
conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        ActiveMQMessageConsumer cons2 =
(ActiveMQMessageConsumer)s.createConsumer(new
ActiveMQTopic("inbound"));
        cons2.setMessageListener(new InboundMessageListener());
        conn.start();
    }

    private static TrustManager[] getTrustManager() throws Throwable {
        KeyStore ts = KeyStore.getInstance("JKS");
        ts.load(new FileInputStream("trust.jks"), "trustpass".toCharArray());
        TrustManagerFactory tm =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tm.init(ts);
        return tm.getTrustManagers();
    }

    private static KeyManager[] getKeyManager() throws Throwable {
        KeyStore ts = KeyStore.getInstance("JKS");
        ts.load(new FileInputStream("svc01.jks"), "svcpass".toCharArray());
        KeyManagerFactory km =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        km.init(ts, "svcpass".toCharArray());
        return km.getKeyManagers();
    }
}

******************* FILE InboundMessageListener.java ************
package jd.test.jmstest;

import java.nio.charset.StandardCharsets;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.apache.activemq.command.ActiveMQBytesMessage;

final class InboundMessageListener implements MessageListener {
    public void onMessage(Message message) {
        ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message;
        try {
            byte[] arr = new byte[(int) msg.getBodyLength()];
            msg.readBytes(arr);
            System.out.println("INBOUND MSG: " + new String(arr,
StandardCharsets.UTF_8));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

*************************** END **********************

I suppose same thing happens without SSL, but I include everything in
case it is something on my side - I do not include plugin code as
simplest plugin below receives same topic name:

******************* FILE CertAuthPlugin2.java *****************
package zs.comm.amq.auth;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.log4j.Logger;

public class CertAuthPlugin2 extends BrokerFilter {

    private final Logger l = Logger.getLogger(getClass());

    public CertAuthPlugin2(Broker borker) {
        super(borker);
    }

    @Override
    public Subscription addConsumer(ConnectionContext context,
ConsumerInfo info) throws Exception {
        if (!checkCanRead(info.getDestination())) {
        }
        return super.addConsumer(context, info);
    }

    protected boolean checkCanRead(ActiveMQDestination destination) {
        l.warn("CHECK CAN READ: \nDEST: " + destination + "\nIS TOPIC: "
                + destination.isTopic() + "\nPHYSNAME: " +
destination.getPhysicalName() + "\nDEST AS STR: "
                + destination.getDestinationTypeAsString() +
"\nQUALIFIED NAME: " + destination.getQualifiedName()
                + "\nDEST PATHS: " + String.join("|",
destination.getDestinationPaths()) + "\nIS TEMP: "
                + destination.isTemporary());
        if (destination.isTopic() && destination.getPhysicalName()

.equals("topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic"))
{
            try {
                    throw new RuntimeException("got concatenated
destination physical name");
            } catch (Exception e) {
                e.printStackTrace();
                throw e;
            }
        }
        return true;
    }
}

******************** END ******************************

And log (this time only WARN level, but stacktrace is there):

 WARN | CHECK CAN READ:
DEST: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
IS TOPIC: true
PHYSNAME: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
DEST AS STR: Topic
QUALIFIED NAME:
topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
DEST PATHS: topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
IS TEMP: false
java.lang.RuntimeException: got concatenated destination physical name
    at zs.comm.amq.auth.CertAuthPlugin2.checkCanRead(CertAuthPlugin2.java:44)
    at zs.comm.amq.auth.CertAuthPlugin2.addConsumer(CertAuthPlugin2.java:30)
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
    at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)
    at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)



I hope that it is enough to get someone started.

JD


> Tim
>
> On Thu, Sep 26, 2019, 4:26 AM Christopher Shannon <
> [hidden email]> wrote:
>
> > These are advisory messages, I suggest you read
> > https://activemq.apache.org/advisory-message.html which will explain what
> > they are and how to configure/disable them.
> >
> > On Tue, Sep 24, 2019 at 4:52 PM Jędrzej Dudkiewicz <
> > [hidden email]> wrote:
> >
> > > I wrote yet another certificate-based authentication/authorization
> > > plugin. It reads CN from certificate provided by the client and
> > > depending on what is in there it allows or dissalows
> > > creation/deletion/writing to topics/queues. To test it I wrote a
> > > simple Java client that uses OpenWire. Connection is made
> > > successfully, CN is read and permissions are added are expected.
> > > Unfortunately after connection is created (addConnection in plugin is
> > > called), destination "topic://ActiveMQ.Advisory.Connection" is created
> > > by the client, or rather in its context (ok, unexpected, but it makes
> > > sense as I used advisory topics), method addConsumer() (from
> > > BrokerFilter) is called, and its destination is:
> > >
> > > DEST:
> > > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > > IS TOPIC: true
> > > PHYSNAME:
> > > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > > DEST AS STR: Topic
> > > QUALIFIED NAME:
> > > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > > DEST PATHS:
> > > topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> > > IS TEMP: false
> > >
> > > This means that it is a topic, its physical name is equal to qualified
> > > name, which makes no sense whatsoever, what more this name is made up
> > > from two topics names, it isn't temporary (not what name suggests) and
> > > to add insult to the injury its "getDestinationPaths()" method returns
> > > array with the following elements:
> > > "topic://ActiveMQ"
> > > "Advisory"
> > > "TempQueue,topic://ActiveMQ"
> > > "Advisory"
> > > "TempTopic"
> > >
> > > I wrote my plugin using
> > > "org.apache.activemq.security.AuthorizationBroker" as a base and aside
> > > from this and single problem with MQTT protocol it works fine.
> > >
> > > What is the reason for this topic's existence?
> > > Why is its name so weird?
> > > Should I expect more destinations with such names?
> > > Is it enough to check for exactly this name and I will be done? I did
> > > it and it works, but I can't be sure that it won't happen again with
> > > different weird topic name.
> > >
> > > Thanks in advance,
> > > --
> > > Jędrzej Dudkiewicz
> > >
> > > I really hate this damn machine, I wish that they would sell it.
> > > It never does just what I want, but only what I tell it.
> > >
> >



--
Jędrzej Dudkiewicz

I really hate this damn machine, I wish that they would sell it.
It never does just what I want, but only what I tell it.
Reply | Threaded
Open this post in threaded view
|

Re: Authorization plugin - don't know what to do with client attempting to write to weird advisory topic

tabish121@gmail.com
On 9/26/19 10:22 AM, Jędrzej Dudkiewicz wrote:
> On Thu, Sep 26, 2019 at 3:01 PM Tim Bain <[hidden email]> wrote:
>> Chris,
>>
>> The problem he's pointing out is that the destination name is the
>> comma-separated concatenation of two advisory topic names. It feels like
>> there's a missing String.split() call earlier in the code path.
> Precisely. I'm aware of advisory messages and in fact I want to use
> them and all would be fine if it weren't for those pesky unsplit
> names.

This appears to be perfectly normal behavior for a composite destination
subscription, the client is asking for both temp queue and temp topic
subscription here:
https://github.com/apache/activemq/blob/master/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java#L45

The definition is here:

https://github.com/apache/activemq/blob/master/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java#L77


>> I haven't had time to go looking for where that might be, and won't for
>> at least a week, though if someone else has time to do it, that would
>> be great. Jędrzej, could you please capture a stack trace at the point
>> where you're seeing this behavior, to help whoever investigates this?
> Below is log (TRACE level, from "bin/activemq console") resulting from
> connecting using client below. Only part after everything settled is
> included:
>
> DEBUG | Checkpoint started.
> DEBUG | Checkpoint done.
> DEBUG | Checkpoint started.
> TRACE | Last update: 1:60346, full gc candidates set: [1]
> TRACE | gc candidates after producerSequenceIdTrackerLocation:1:59943, []
> TRACE | gc candidates after ackMessageFileMapLocation:1:60346, []
> TRACE | gc candidates after in progress tx range:[null, null], []
> TRACE | gc candidates: []
> TRACE | ackMessageFileMap: {}
> TRACE | Not yet time to check for compaction: 1 of 10 cycles
> DEBUG | Checkpoint done.
> TRACE | Execute[ActiveMQ BrokerService[localhost] Task] runnable:
> org.apache.activemq.broker.TransportConnector$1$1@4629be82
> TRACE | Created thread[ActiveMQ BrokerService[localhost] Task-1]:
> Thread[ActiveMQ BrokerService[localhost] Task-1,5,main]
> TRACE | Starting connection check task for: ssl:///127.0.0.1:38422
> TRACE | TCP consumer thread for ssl:///127.0.0.1:38422 starting
> DEBUG | Sending: WireFormatInfo { version=12,
> properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true,
> PlatformDetails=Java, CacheEnabled=true, TightEncodingEnabled=true,
> MaxFrameSize=104857600, MaxInactivityDuration=30000,
> MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.10},
> magic=[A,c,t,i,v,e,M,Q]}
> TRACE | Created thread[ActiveMQ BrokerService[localhost] Task-2]:
> Thread[ActiveMQ BrokerService[localhost] Task-2,5,main]
> TRACE | Stopping connection check task for: ssl:///127.0.0.1:38422
> TRACE | Running task iteration 0 - Transport Connection to:
> tcp://127.0.0.1:38422
> DEBUG | Using min of local: WireFormatInfo { version=12,
> properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true,
> PlatformDetails=Java, CacheEnabled=true, TightEncodingEnabled=true,
> MaxFrameSize=104857600, MaxInactivityDuration=30000,
> MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.10},
> magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=3,
> properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true,
> TightEncodingEnabled=true, MaxInactivityDuration=30000,
> MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
> DEBUG | Received WireFormat: WireFormatInfo { version=3,
> properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true,
> TightEncodingEnabled=true, MaxInactivityDuration=30000,
> MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
> DEBUG | ssl:///127.0.0.1:38422 before negotiation:
> OpenWireFormat{version=12, cacheEnabled=false,
> stackTraceEnabled=false, tightEncodingEnabled=false,
> sizePrefixDisabled=false, maxFrameSize=104857600}
> DEBUG | ssl:///127.0.0.1:38422 after negotiation:
> OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true,
> tightEncodingEnabled=true, sizePrefixDisabled=false,
> maxFrameSize=104857600}
> TRACE | Running task iteration 1 - Transport Connection to:
> tcp://127.0.0.1:38422
> TRACE | Run task done: Transport Connection to: tcp://127.0.0.1:38422
> DEBUG | Setting up new connection id:
> ID:gojira-34271-1569506356763-0:0, address: tcp://127.0.0.1:38422,
> info: ConnectionInfo {commandId = 1, responseRequired = true,
> connectionId = ID:gojira-34271-1569506356763-0:0, clientId = abc,
> clientIp = null, userName = null, password = *****, brokerPath = null,
> brokerMasterConnector = false, manageable = true, clientMaster = true,
> faultTolerant = false, failoverReconnect = false}
> DEBUG | add conn: abc/tcp://127.0.0.1:38422/null
>   INFO | Allowing client:
> zs.node.service.P-00666/7a1e7236-dfb7-11e9-ae1a-47ac10dfc31e
> DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> DEBUG | add destination: abc/topic://ActiveMQ.Advisory.Connection
> DEBUG | localhost adding destination: topic://ActiveMQ.Advisory.Connection
> TRACE | Running task iteration 0 - Transport Connection to:
> tcp://127.0.0.1:38422
> TRACE | Running task iteration 1 - Transport Connection to:
> tcp://127.0.0.1:38422
> TRACE | Run task done: Transport Connection to: tcp://127.0.0.1:38422
> WARN | CHECK CAN READ: zs.node.service.P-00666
> DEST: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> IS TOPIC: true
> PHYSNAME: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> DEST AS STR: Topic
> QUALIFIED NAME:
> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> DEST PATHS: topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> IS TEMP: false
> java.lang.RuntimeException: got concatenated destination physical name
>      at zs.comm.amq.auth.CertAuthPlugin.checkCanRead(CertAuthPlugin.java:156)
>      at zs.comm.amq.auth.CertAuthPlugin.addConsumer(CertAuthPlugin.java:46)
>      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
>      at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)
>      at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)
>      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)
>      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
>      at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
>      at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
>      at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
>      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>      at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)
>      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
>      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
>      at java.lang.Thread.run(Thread.java:748)
> DEBUG | Error occured while processing sync command: ConsumerInfo
> {commandId = 2, responseRequired = true, consumerId =
> ID:gojira-34271-1569506356763-0:0:-1:1, destination =
> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic,
> prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false,
> dispatchAsync = false, selector = null, clientId = null,
> subscriptionName = null, noLocal = true, exclusive = false,
> retroactive = false, priority = 0, brokerPath = null,
> optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate
> = null, networkConsumerIds = null}, exception:
> java.lang.RuntimeException: got concatenated destination physical name
> java.lang.RuntimeException: got concatenated destination physical name
>      at zs.comm.amq.auth.CertAuthPlugin.checkCanRead(CertAuthPlugin.java:156)[auth-0.0.1-SNAPSHOT.jar:]
>      at zs.comm.amq.auth.CertAuthPlugin.addConsumer(CertAuthPlugin.java:46)[auth-0.0.1-SNAPSHOT.jar:]
>      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)[activemq-broker-5.15.10.jar:5.15.10]
>      at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)[activemq-broker-5.15.10.jar:5.15.10]
>      at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.10.jar:5.15.10]
>      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)[activemq-client-5.15.10.jar:5.15.10]
>      at java.lang.Thread.run(Thread.java:748)[:1.8.0_222]
> TRACE | Shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@55228189[Running, pool size =
> 0, active threads = 0, queued tasks = 0, completed tasks = 0] with
> await termination: 10000 millis
> DEBUG | Shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@55228189[Terminated, pool size
> = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is
> shutdown: true and terminated: true took: 0.002 seconds.
> DEBUG | Transport Connection to: tcp://127.0.0.1:38422 failed:
> java.io.EOFException
> java.io.EOFException
>      at java.io.DataInputStream.readInt(DataInputStream.java:392)[:1.8.0_222]
>      at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)[activemq-client-5.15.10.jar:5.15.10]
>      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)[activemq-client-5.15.10.jar:5.15.10]
>      at java.lang.Thread.run(Thread.java:748)[:1.8.0_222]
> TRACE | Execute[ActiveMQ BrokerService[localhost] Task] runnable:
> org.apache.activemq.broker.TransportConnection$4@7e75966d
> DEBUG | Unregistering MBean
> org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire,connectionViewType=clientId,connectionName=abc
> DEBUG | Unregistering MBean
> org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire,connectionViewType=remoteAddress,connectionName=tcp_//127.0.0.1_38422
> DEBUG | Stopping connection: tcp://127.0.0.1:38422
> DEBUG | Stopping transport ssl:///127.0.0.1:38422
> DEBUG | Initialized TaskRunnerFactory[ActiveMQ Task] using
> ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Running, pool size =
> 0, active threads = 0, queued tasks = 0, completed tasks = 0]
> TRACE | Execute[ActiveMQ Task] runnable:
> org.apache.activemq.transport.tcp.TcpTransport$1@78a078c4
> TRACE | Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main]
> TRACE | Closing socket 40b115e[TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384:
> Socket[addr=/127.0.0.1,port=38422,localport=61616]]
> DEBUG | Closed socket 40b115e[TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384:
> Socket[addr=/127.0.0.1,port=38422,localport=61616]]
> DEBUG | Forcing shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Running, pool size =
> 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> TRACE | Shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Shutting down, pool
> size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> is shutdown: true and terminated: false.
> DEBUG | Stopped transport: tcp://127.0.0.1:38422
> TRACE | Shutdown timeout: 1 task: Transport Connection to: tcp://127.0.0.1:38422
> DEBUG | Cleaning up connection resources: tcp://127.0.0.1:38422
> DEBUG | remove connection id: ID:gojira-34271-1569506356763-0:0
> DEBUG | Remove connection: abc/java.io.EOFException
> DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> DEBUG | add destination: abc/topic://ActiveMQ.Advisory.Connection
> DEBUG | Connection Stopped: tcp://127.0.0.1:38422
> DEBUG | Checkpoint started.
> DEBUG | Checkpoint done.
> ^C INFO | Apache ActiveMQ 5.15.10 (localhost,
> ID:gojira-40629-1569506325958-0:1) is shutting down
> DEBUG | Caught exception, must be shutting down. This exception is ignored.
> java.lang.IllegalStateException: Shutdown in progress
>      at java.lang.ApplicationShutdownHooks.remove(ApplicationShutdownHooks.java:82)[:1.8.0_222]
>      at java.lang.Runtime.removeShutdownHook(Runtime.java:239)[:1.8.0_222]
>      at org.apache.activemq.broker.BrokerService.removeShutdownHook(BrokerService.java:2551)[activemq-broker-5.15.10.jar:5.15.10]
>      at org.apache.activemq.broker.BrokerService.stop(BrokerService.java:838)[activemq-broker-5.15.10.jar:5.15.10]
>      at org.apache.activemq.xbean.XBeanBrokerService.stop(XBeanBrokerService.java:122)[activemq-spring-5.15.10.jar:5.15.10]
>      at org.apache.activemq.broker.BrokerService.containerShutdown(BrokerService.java:2574)[activemq-broker-5.15.10.jar:5.15.10]
>      at org.apache.activemq.broker.BrokerService$7.run(BrokerService.java:2541)[activemq-broker-5.15.10.jar:5.15.10]
> DEBUG | Unregistering MBean
> org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire
>
>
>
> There is more, but it relates to broker shutting down.
>
> Below is code of Java client used:
>
>
> **************** FILE App.java *******************
> package jd.test.jmstest;
>
> import java.io.FileInputStream;
> import java.security.KeyStore;
> import java.security.SecureRandom;
>
> import javax.jms.Session;
> import javax.net.ssl.KeyManager;
> import javax.net.ssl.KeyManagerFactory;
> import javax.net.ssl.TrustManager;
> import javax.net.ssl.TrustManagerFactory;
>
> import org.apache.activemq.ActiveMQConnection;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.ActiveMQSession;
> import org.apache.activemq.ActiveMQSslConnectionFactory;
> import org.apache.activemq.command.ActiveMQTopic;
>
> public class App {
>      public static void main(String[] args) throws Throwable {
>          System.out.println("Hello World!");
>          ActiveMQSslConnectionFactory f = new ActiveMQSslConnectionFactory();
>          f.setBrokerURL("ssl://localhost:61616");
>          f.setClientID("abc");
>          f.setKeyAndTrustManagers(getKeyManager(), getTrustManager(),
> new SecureRandom());
>          ActiveMQConnection conn = (ActiveMQConnection) f.createConnection();
>          System.out.println(conn.getClientID());
>          System.out.println(conn.getClass());
>          ActiveMQSession s = (ActiveMQSession)
> conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
>          ActiveMQMessageConsumer cons2 =
> (ActiveMQMessageConsumer)s.createConsumer(new
> ActiveMQTopic("inbound"));
>          cons2.setMessageListener(new InboundMessageListener());
>          conn.start();
>      }
>
>      private static TrustManager[] getTrustManager() throws Throwable {
>          KeyStore ts = KeyStore.getInstance("JKS");
>          ts.load(new FileInputStream("trust.jks"), "trustpass".toCharArray());
>          TrustManagerFactory tm =
> TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
>          tm.init(ts);
>          return tm.getTrustManagers();
>      }
>
>      private static KeyManager[] getKeyManager() throws Throwable {
>          KeyStore ts = KeyStore.getInstance("JKS");
>          ts.load(new FileInputStream("svc01.jks"), "svcpass".toCharArray());
>          KeyManagerFactory km =
> KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
>          km.init(ts, "svcpass".toCharArray());
>          return km.getKeyManagers();
>      }
> }
>
> ******************* FILE InboundMessageListener.java ************
> package jd.test.jmstest;
>
> import java.nio.charset.StandardCharsets;
>
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageListener;
>
> import org.apache.activemq.command.ActiveMQBytesMessage;
>
> final class InboundMessageListener implements MessageListener {
>      public void onMessage(Message message) {
>          ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message;
>          try {
>              byte[] arr = new byte[(int) msg.getBodyLength()];
>              msg.readBytes(arr);
>              System.out.println("INBOUND MSG: " + new String(arr,
> StandardCharsets.UTF_8));
>          } catch (JMSException e) {
>              e.printStackTrace();
>          }
>      }
> }
>
> *************************** END **********************
>
> I suppose same thing happens without SSL, but I include everything in
> case it is something on my side - I do not include plugin code as
> simplest plugin below receives same topic name:
>
> ******************* FILE CertAuthPlugin2.java *****************
> package zs.comm.amq.auth;
>
> import org.apache.activemq.broker.Broker;
> import org.apache.activemq.broker.BrokerFilter;
> import org.apache.activemq.broker.ConnectionContext;
> import org.apache.activemq.broker.region.Subscription;
> import org.apache.activemq.command.ActiveMQDestination;
> import org.apache.activemq.command.ConsumerInfo;
> import org.apache.log4j.Logger;
>
> public class CertAuthPlugin2 extends BrokerFilter {
>
>      private final Logger l = Logger.getLogger(getClass());
>
>      public CertAuthPlugin2(Broker borker) {
>          super(borker);
>      }
>
>      @Override
>      public Subscription addConsumer(ConnectionContext context,
> ConsumerInfo info) throws Exception {
>          if (!checkCanRead(info.getDestination())) {
>          }
>          return super.addConsumer(context, info);
>      }
>
>      protected boolean checkCanRead(ActiveMQDestination destination) {
>          l.warn("CHECK CAN READ: \nDEST: " + destination + "\nIS TOPIC: "
>                  + destination.isTopic() + "\nPHYSNAME: " +
> destination.getPhysicalName() + "\nDEST AS STR: "
>                  + destination.getDestinationTypeAsString() +
> "\nQUALIFIED NAME: " + destination.getQualifiedName()
>                  + "\nDEST PATHS: " + String.join("|",
> destination.getDestinationPaths()) + "\nIS TEMP: "
>                  + destination.isTemporary());
>          if (destination.isTopic() && destination.getPhysicalName()
>
> .equals("topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic"))
> {
>              try {
>                      throw new RuntimeException("got concatenated
> destination physical name");
>              } catch (Exception e) {
>                  e.printStackTrace();
>                  throw e;
>              }
>          }
>          return true;
>      }
> }
>
> ******************** END ******************************
>
> And log (this time only WARN level, but stacktrace is there):
>
>   WARN | CHECK CAN READ:
> DEST: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> IS TOPIC: true
> PHYSNAME: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> DEST AS STR: Topic
> QUALIFIED NAME:
> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> DEST PATHS: topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> IS TEMP: false
> java.lang.RuntimeException: got concatenated destination physical name
>      at zs.comm.amq.auth.CertAuthPlugin2.checkCanRead(CertAuthPlugin2.java:44)
>      at zs.comm.amq.auth.CertAuthPlugin2.addConsumer(CertAuthPlugin2.java:30)
>      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
>      at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)
>      at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)
>      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)
>      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
>      at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
>      at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
>      at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
>      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>      at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)
>      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
>      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
>      at java.lang.Thread.run(Thread.java:748)
>
>
>
> I hope that it is enough to get someone started.
>
> JD
>
>
>> Tim
>>
>> On Thu, Sep 26, 2019, 4:26 AM Christopher Shannon <
>> [hidden email]> wrote:
>>
>>> These are advisory messages, I suggest you read
>>> https://activemq.apache.org/advisory-message.html which will explain what
>>> they are and how to configure/disable them.
>>>
>>> On Tue, Sep 24, 2019 at 4:52 PM Jędrzej Dudkiewicz <
>>> [hidden email]> wrote:
>>>
>>>> I wrote yet another certificate-based authentication/authorization
>>>> plugin. It reads CN from certificate provided by the client and
>>>> depending on what is in there it allows or dissalows
>>>> creation/deletion/writing to topics/queues. To test it I wrote a
>>>> simple Java client that uses OpenWire. Connection is made
>>>> successfully, CN is read and permissions are added are expected.
>>>> Unfortunately after connection is created (addConnection in plugin is
>>>> called), destination "topic://ActiveMQ.Advisory.Connection" is created
>>>> by the client, or rather in its context (ok, unexpected, but it makes
>>>> sense as I used advisory topics), method addConsumer() (from
>>>> BrokerFilter) is called, and its destination is:
>>>>
>>>> DEST:
>>>> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
>>>> IS TOPIC: true
>>>> PHYSNAME:
>>>> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
>>>> DEST AS STR: Topic
>>>> QUALIFIED NAME:
>>>> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
>>>> DEST PATHS:
>>>> topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
>>>> IS TEMP: false
>>>>
>>>> This means that it is a topic, its physical name is equal to qualified
>>>> name, which makes no sense whatsoever, what more this name is made up
>>>> from two topics names, it isn't temporary (not what name suggests) and
>>>> to add insult to the injury its "getDestinationPaths()" method returns
>>>> array with the following elements:
>>>> "topic://ActiveMQ"
>>>> "Advisory"
>>>> "TempQueue,topic://ActiveMQ"
>>>> "Advisory"
>>>> "TempTopic"
>>>>
>>>> I wrote my plugin using
>>>> "org.apache.activemq.security.AuthorizationBroker" as a base and aside
>>>> from this and single problem with MQTT protocol it works fine.
>>>>
>>>> What is the reason for this topic's existence?
>>>> Why is its name so weird?
>>>> Should I expect more destinations with such names?
>>>> Is it enough to check for exactly this name and I will be done? I did
>>>> it and it works, but I can't be sure that it won't happen again with
>>>> different weird topic name.
>>>>
>>>> Thanks in advance,
>>>> --
>>>> Jędrzej Dudkiewicz
>>>>
>>>> I really hate this damn machine, I wish that they would sell it.
>>>> It never does just what I want, but only what I tell it.
>>>>
>
>

--
Tim Bish

Reply | Threaded
Open this post in threaded view
|

Re: Authorization plugin - don't know what to do with client attempting to write to weird advisory topic

Jędrzej Dudkiewicz
On Thu, Sep 26, 2019 at 4:34 PM Timothy Bish <[hidden email]> wrote:

>
> On 9/26/19 10:22 AM, Jędrzej Dudkiewicz wrote:
> > On Thu, Sep 26, 2019 at 3:01 PM Tim Bain <[hidden email]> wrote:
> >> Chris,
> >>
> >> The problem he's pointing out is that the destination name is the
> >> comma-separated concatenation of two advisory topic names.
>
> This appears to be perfectly normal behavior for a composite destination
> subscription, the client is asking for both temp queue and temp topic
> subscription here:
> https://github.com/apache/activemq/blob/master/activemq-client/src/main/java/org/apache/activemq/AdvisoryConsumer.java#L45
>
> The definition is here:
>
> https://github.com/apache/activemq/blob/master/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java#L77

Thanks Tim.

One more question: is it safe/future proof if I simply split(',') this
name? Or are there some caveats like secret wildcards or some such? If
spit is sufficient then it would be great, later I can simply add
pattern for such topics/queues and be done with them.

JD

>
>
> >> I haven't had time to go looking for where that might be, and won't for
> >> at least a week, though if someone else has time to do it, that would
> >> be great. Jędrzej, could you please capture a stack trace at the point
> >> where you're seeing this behavior, to help whoever investigates this?
> > Below is log (TRACE level, from "bin/activemq console") resulting from
> > connecting using client below. Only part after everything settled is
> > included:
> >
> > DEBUG | Checkpoint started.
> > DEBUG | Checkpoint done.
> > DEBUG | Checkpoint started.
> > TRACE | Last update: 1:60346, full gc candidates set: [1]
> > TRACE | gc candidates after producerSequenceIdTrackerLocation:1:59943, []
> > TRACE | gc candidates after ackMessageFileMapLocation:1:60346, []
> > TRACE | gc candidates after in progress tx range:[null, null], []
> > TRACE | gc candidates: []
> > TRACE | ackMessageFileMap: {}
> > TRACE | Not yet time to check for compaction: 1 of 10 cycles
> > DEBUG | Checkpoint done.
> > TRACE | Execute[ActiveMQ BrokerService[localhost] Task] runnable:
> > org.apache.activemq.broker.TransportConnector$1$1@4629be82
> > TRACE | Created thread[ActiveMQ BrokerService[localhost] Task-1]:
> > Thread[ActiveMQ BrokerService[localhost] Task-1,5,main]
> > TRACE | Starting connection check task for: ssl:///127.0.0.1:38422
> > TRACE | TCP consumer thread for ssl:///127.0.0.1:38422 starting
> > DEBUG | Sending: WireFormatInfo { version=12,
> > properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> > CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true,
> > PlatformDetails=Java, CacheEnabled=true, TightEncodingEnabled=true,
> > MaxFrameSize=104857600, MaxInactivityDuration=30000,
> > MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.10},
> > magic=[A,c,t,i,v,e,M,Q]}
> > TRACE | Created thread[ActiveMQ BrokerService[localhost] Task-2]:
> > Thread[ActiveMQ BrokerService[localhost] Task-2,5,main]
> > TRACE | Stopping connection check task for: ssl:///127.0.0.1:38422
> > TRACE | Running task iteration 0 - Transport Connection to:
> > tcp://127.0.0.1:38422
> > DEBUG | Using min of local: WireFormatInfo { version=12,
> > properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> > CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true,
> > PlatformDetails=Java, CacheEnabled=true, TightEncodingEnabled=true,
> > MaxFrameSize=104857600, MaxInactivityDuration=30000,
> > MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.10},
> > magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=3,
> > properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> > CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true,
> > TightEncodingEnabled=true, MaxInactivityDuration=30000,
> > MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
> > DEBUG | Received WireFormat: WireFormatInfo { version=3,
> > properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false,
> > CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true,
> > TightEncodingEnabled=true, MaxInactivityDuration=30000,
> > MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
> > DEBUG | ssl:///127.0.0.1:38422 before negotiation:
> > OpenWireFormat{version=12, cacheEnabled=false,
> > stackTraceEnabled=false, tightEncodingEnabled=false,
> > sizePrefixDisabled=false, maxFrameSize=104857600}
> > DEBUG | ssl:///127.0.0.1:38422 after negotiation:
> > OpenWireFormat{version=3, cacheEnabled=true, stackTraceEnabled=true,
> > tightEncodingEnabled=true, sizePrefixDisabled=false,
> > maxFrameSize=104857600}
> > TRACE | Running task iteration 1 - Transport Connection to:
> > tcp://127.0.0.1:38422
> > TRACE | Run task done: Transport Connection to: tcp://127.0.0.1:38422
> > DEBUG | Setting up new connection id:
> > ID:gojira-34271-1569506356763-0:0, address: tcp://127.0.0.1:38422,
> > info: ConnectionInfo {commandId = 1, responseRequired = true,
> > connectionId = ID:gojira-34271-1569506356763-0:0, clientId = abc,
> > clientIp = null, userName = null, password = *****, brokerPath = null,
> > brokerMasterConnector = false, manageable = true, clientMaster = true,
> > faultTolerant = false, failoverReconnect = false}
> > DEBUG | add conn: abc/tcp://127.0.0.1:38422/null
> >   INFO | Allowing client:
> > zs.node.service.P-00666/7a1e7236-dfb7-11e9-ae1a-47ac10dfc31e
> > DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> > ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> > DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> > ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> > DEBUG | add destination: abc/topic://ActiveMQ.Advisory.Connection
> > DEBUG | localhost adding destination: topic://ActiveMQ.Advisory.Connection
> > TRACE | Running task iteration 0 - Transport Connection to:
> > tcp://127.0.0.1:38422
> > TRACE | Running task iteration 1 - Transport Connection to:
> > tcp://127.0.0.1:38422
> > TRACE | Run task done: Transport Connection to: tcp://127.0.0.1:38422
> > WARN | CHECK CAN READ: zs.node.service.P-00666
> > DEST: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > IS TOPIC: true
> > PHYSNAME: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > DEST AS STR: Topic
> > QUALIFIED NAME:
> > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > DEST PATHS: topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> > IS TEMP: false
> > java.lang.RuntimeException: got concatenated destination physical name
> >      at zs.comm.amq.auth.CertAuthPlugin.checkCanRead(CertAuthPlugin.java:156)
> >      at zs.comm.amq.auth.CertAuthPlugin.addConsumer(CertAuthPlugin.java:46)
> >      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> >      at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)
> >      at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)
> >      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)
> >      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
> >      at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> >      at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
> >      at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
> >      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> >      at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)
> >      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
> >      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
> >      at java.lang.Thread.run(Thread.java:748)
> > DEBUG | Error occured while processing sync command: ConsumerInfo
> > {commandId = 2, responseRequired = true, consumerId =
> > ID:gojira-34271-1569506356763-0:0:-1:1, destination =
> > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic,
> > prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false,
> > dispatchAsync = false, selector = null, clientId = null,
> > subscriptionName = null, noLocal = true, exclusive = false,
> > retroactive = false, priority = 0, brokerPath = null,
> > optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate
> > = null, networkConsumerIds = null}, exception:
> > java.lang.RuntimeException: got concatenated destination physical name
> > java.lang.RuntimeException: got concatenated destination physical name
> >      at zs.comm.amq.auth.CertAuthPlugin.checkCanRead(CertAuthPlugin.java:156)[auth-0.0.1-SNAPSHOT.jar:]
> >      at zs.comm.amq.auth.CertAuthPlugin.addConsumer(CertAuthPlugin.java:46)[auth-0.0.1-SNAPSHOT.jar:]
> >      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)[activemq-broker-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)[activemq-broker-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)[activemq-client-5.15.10.jar:5.15.10]
> >      at java.lang.Thread.run(Thread.java:748)[:1.8.0_222]
> > TRACE | Shutdown of ExecutorService:
> > java.util.concurrent.ThreadPoolExecutor@55228189[Running, pool size =
> > 0, active threads = 0, queued tasks = 0, completed tasks = 0] with
> > await termination: 10000 millis
> > DEBUG | Shutdown of ExecutorService:
> > java.util.concurrent.ThreadPoolExecutor@55228189[Terminated, pool size
> > = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is
> > shutdown: true and terminated: true took: 0.002 seconds.
> > DEBUG | Transport Connection to: tcp://127.0.0.1:38422 failed:
> > java.io.EOFException
> > java.io.EOFException
> >      at java.io.DataInputStream.readInt(DataInputStream.java:392)[:1.8.0_222]
> >      at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)[activemq-client-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)[activemq-client-5.15.10.jar:5.15.10]
> >      at java.lang.Thread.run(Thread.java:748)[:1.8.0_222]
> > TRACE | Execute[ActiveMQ BrokerService[localhost] Task] runnable:
> > org.apache.activemq.broker.TransportConnection$4@7e75966d
> > DEBUG | Unregistering MBean
> > org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire,connectionViewType=clientId,connectionName=abc
> > DEBUG | Unregistering MBean
> > org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire,connectionViewType=remoteAddress,connectionName=tcp_//127.0.0.1_38422
> > DEBUG | Stopping connection: tcp://127.0.0.1:38422
> > DEBUG | Stopping transport ssl:///127.0.0.1:38422
> > DEBUG | Initialized TaskRunnerFactory[ActiveMQ Task] using
> > ExecutorService:
> > java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Running, pool size =
> > 0, active threads = 0, queued tasks = 0, completed tasks = 0]
> > TRACE | Execute[ActiveMQ Task] runnable:
> > org.apache.activemq.transport.tcp.TcpTransport$1@78a078c4
> > TRACE | Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main]
> > TRACE | Closing socket 40b115e[TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384:
> > Socket[addr=/127.0.0.1,port=38422,localport=61616]]
> > DEBUG | Closed socket 40b115e[TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384:
> > Socket[addr=/127.0.0.1,port=38422,localport=61616]]
> > DEBUG | Forcing shutdown of ExecutorService:
> > java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Running, pool size =
> > 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> > TRACE | Shutdown of ExecutorService:
> > java.util.concurrent.ThreadPoolExecutor@6e50ec0a[Shutting down, pool
> > size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
> > is shutdown: true and terminated: false.
> > DEBUG | Stopped transport: tcp://127.0.0.1:38422
> > TRACE | Shutdown timeout: 1 task: Transport Connection to: tcp://127.0.0.1:38422
> > DEBUG | Cleaning up connection resources: tcp://127.0.0.1:38422
> > DEBUG | remove connection id: ID:gojira-34271-1569506356763-0:0
> > DEBUG | Remove connection: abc/java.io.EOFException
> > DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> > ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> > DEBUG | Publishing: ssl://gojira:61616 for broker transport URI:
> > ssl://gojira:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.needClientAuth=true
> > DEBUG | add destination: abc/topic://ActiveMQ.Advisory.Connection
> > DEBUG | Connection Stopped: tcp://127.0.0.1:38422
> > DEBUG | Checkpoint started.
> > DEBUG | Checkpoint done.
> > ^C INFO | Apache ActiveMQ 5.15.10 (localhost,
> > ID:gojira-40629-1569506325958-0:1) is shutting down
> > DEBUG | Caught exception, must be shutting down. This exception is ignored.
> > java.lang.IllegalStateException: Shutdown in progress
> >      at java.lang.ApplicationShutdownHooks.remove(ApplicationShutdownHooks.java:82)[:1.8.0_222]
> >      at java.lang.Runtime.removeShutdownHook(Runtime.java:239)[:1.8.0_222]
> >      at org.apache.activemq.broker.BrokerService.removeShutdownHook(BrokerService.java:2551)[activemq-broker-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.broker.BrokerService.stop(BrokerService.java:838)[activemq-broker-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.xbean.XBeanBrokerService.stop(XBeanBrokerService.java:122)[activemq-spring-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.broker.BrokerService.containerShutdown(BrokerService.java:2574)[activemq-broker-5.15.10.jar:5.15.10]
> >      at org.apache.activemq.broker.BrokerService$7.run(BrokerService.java:2541)[activemq-broker-5.15.10.jar:5.15.10]
> > DEBUG | Unregistering MBean
> > org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName=openwire
> >
> >
> >
> > There is more, but it relates to broker shutting down.
> >
> > Below is code of Java client used:
> >
> >
> > **************** FILE App.java *******************
> > package jd.test.jmstest;
> >
> > import java.io.FileInputStream;
> > import java.security.KeyStore;
> > import java.security.SecureRandom;
> >
> > import javax.jms.Session;
> > import javax.net.ssl.KeyManager;
> > import javax.net.ssl.KeyManagerFactory;
> > import javax.net.ssl.TrustManager;
> > import javax.net.ssl.TrustManagerFactory;
> >
> > import org.apache.activemq.ActiveMQConnection;
> > import org.apache.activemq.ActiveMQMessageConsumer;
> > import org.apache.activemq.ActiveMQSession;
> > import org.apache.activemq.ActiveMQSslConnectionFactory;
> > import org.apache.activemq.command.ActiveMQTopic;
> >
> > public class App {
> >      public static void main(String[] args) throws Throwable {
> >          System.out.println("Hello World!");
> >          ActiveMQSslConnectionFactory f = new ActiveMQSslConnectionFactory();
> >          f.setBrokerURL("ssl://localhost:61616");
> >          f.setClientID("abc");
> >          f.setKeyAndTrustManagers(getKeyManager(), getTrustManager(),
> > new SecureRandom());
> >          ActiveMQConnection conn = (ActiveMQConnection) f.createConnection();
> >          System.out.println(conn.getClientID());
> >          System.out.println(conn.getClass());
> >          ActiveMQSession s = (ActiveMQSession)
> > conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
> >          ActiveMQMessageConsumer cons2 =
> > (ActiveMQMessageConsumer)s.createConsumer(new
> > ActiveMQTopic("inbound"));
> >          cons2.setMessageListener(new InboundMessageListener());
> >          conn.start();
> >      }
> >
> >      private static TrustManager[] getTrustManager() throws Throwable {
> >          KeyStore ts = KeyStore.getInstance("JKS");
> >          ts.load(new FileInputStream("trust.jks"), "trustpass".toCharArray());
> >          TrustManagerFactory tm =
> > TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
> >          tm.init(ts);
> >          return tm.getTrustManagers();
> >      }
> >
> >      private static KeyManager[] getKeyManager() throws Throwable {
> >          KeyStore ts = KeyStore.getInstance("JKS");
> >          ts.load(new FileInputStream("svc01.jks"), "svcpass".toCharArray());
> >          KeyManagerFactory km =
> > KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
> >          km.init(ts, "svcpass".toCharArray());
> >          return km.getKeyManagers();
> >      }
> > }
> >
> > ******************* FILE InboundMessageListener.java ************
> > package jd.test.jmstest;
> >
> > import java.nio.charset.StandardCharsets;
> >
> > import javax.jms.JMSException;
> > import javax.jms.Message;
> > import javax.jms.MessageListener;
> >
> > import org.apache.activemq.command.ActiveMQBytesMessage;
> >
> > final class InboundMessageListener implements MessageListener {
> >      public void onMessage(Message message) {
> >          ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message;
> >          try {
> >              byte[] arr = new byte[(int) msg.getBodyLength()];
> >              msg.readBytes(arr);
> >              System.out.println("INBOUND MSG: " + new String(arr,
> > StandardCharsets.UTF_8));
> >          } catch (JMSException e) {
> >              e.printStackTrace();
> >          }
> >      }
> > }
> >
> > *************************** END **********************
> >
> > I suppose same thing happens without SSL, but I include everything in
> > case it is something on my side - I do not include plugin code as
> > simplest plugin below receives same topic name:
> >
> > ******************* FILE CertAuthPlugin2.java *****************
> > package zs.comm.amq.auth;
> >
> > import org.apache.activemq.broker.Broker;
> > import org.apache.activemq.broker.BrokerFilter;
> > import org.apache.activemq.broker.ConnectionContext;
> > import org.apache.activemq.broker.region.Subscription;
> > import org.apache.activemq.command.ActiveMQDestination;
> > import org.apache.activemq.command.ConsumerInfo;
> > import org.apache.log4j.Logger;
> >
> > public class CertAuthPlugin2 extends BrokerFilter {
> >
> >      private final Logger l = Logger.getLogger(getClass());
> >
> >      public CertAuthPlugin2(Broker borker) {
> >          super(borker);
> >      }
> >
> >      @Override
> >      public Subscription addConsumer(ConnectionContext context,
> > ConsumerInfo info) throws Exception {
> >          if (!checkCanRead(info.getDestination())) {
> >          }
> >          return super.addConsumer(context, info);
> >      }
> >
> >      protected boolean checkCanRead(ActiveMQDestination destination) {
> >          l.warn("CHECK CAN READ: \nDEST: " + destination + "\nIS TOPIC: "
> >                  + destination.isTopic() + "\nPHYSNAME: " +
> > destination.getPhysicalName() + "\nDEST AS STR: "
> >                  + destination.getDestinationTypeAsString() +
> > "\nQUALIFIED NAME: " + destination.getQualifiedName()
> >                  + "\nDEST PATHS: " + String.join("|",
> > destination.getDestinationPaths()) + "\nIS TEMP: "
> >                  + destination.isTemporary());
> >          if (destination.isTopic() && destination.getPhysicalName()
> >
> > .equals("topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic"))
> > {
> >              try {
> >                      throw new RuntimeException("got concatenated
> > destination physical name");
> >              } catch (Exception e) {
> >                  e.printStackTrace();
> >                  throw e;
> >              }
> >          }
> >          return true;
> >      }
> > }
> >
> > ******************** END ******************************
> >
> > And log (this time only WARN level, but stacktrace is there):
> >
> >   WARN | CHECK CAN READ:
> > DEST: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > IS TOPIC: true
> > PHYSNAME: topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > DEST AS STR: Topic
> > QUALIFIED NAME:
> > topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> > DEST PATHS: topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> > IS TEMP: false
> > java.lang.RuntimeException: got concatenated destination physical name
> >      at zs.comm.amq.auth.CertAuthPlugin2.checkCanRead(CertAuthPlugin2.java:44)
> >      at zs.comm.amq.auth.CertAuthPlugin2.addConsumer(CertAuthPlugin2.java:30)
> >      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
> >      at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:703)
> >      at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:352)
> >      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)
> >      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)
> >      at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> >      at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
> >      at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
> >      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> >      at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:171)
> >      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
> >      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
> >      at java.lang.Thread.run(Thread.java:748)
> >
> >
> >
> > I hope that it is enough to get someone started.
> >
> > JD
> >
> >
> >> Tim
> >>
> >> On Thu, Sep 26, 2019, 4:26 AM Christopher Shannon <
> >> [hidden email]> wrote:
> >>
> >>> These are advisory messages, I suggest you read
> >>> https://activemq.apache.org/advisory-message.html which will explain what
> >>> they are and how to configure/disable them.
> >>>
> >>> On Tue, Sep 24, 2019 at 4:52 PM Jędrzej Dudkiewicz <
> >>> [hidden email]> wrote:
> >>>
> >>>> I wrote yet another certificate-based authentication/authorization
> >>>> plugin. It reads CN from certificate provided by the client and
> >>>> depending on what is in there it allows or dissalows
> >>>> creation/deletion/writing to topics/queues. To test it I wrote a
> >>>> simple Java client that uses OpenWire. Connection is made
> >>>> successfully, CN is read and permissions are added are expected.
> >>>> Unfortunately after connection is created (addConnection in plugin is
> >>>> called), destination "topic://ActiveMQ.Advisory.Connection" is created
> >>>> by the client, or rather in its context (ok, unexpected, but it makes
> >>>> sense as I used advisory topics), method addConsumer() (from
> >>>> BrokerFilter) is called, and its destination is:
> >>>>
> >>>> DEST:
> >>>> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> >>>> IS TOPIC: true
> >>>> PHYSNAME:
> >>>> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> >>>> DEST AS STR: Topic
> >>>> QUALIFIED NAME:
> >>>> topic://ActiveMQ.Advisory.TempQueue,topic://ActiveMQ.Advisory.TempTopic
> >>>> DEST PATHS:
> >>>> topic://ActiveMQ|Advisory|TempQueue,topic://ActiveMQ|Advisory|TempTopic
> >>>> IS TEMP: false
> >>>>
> >>>> This means that it is a topic, its physical name is equal to qualified
> >>>> name, which makes no sense whatsoever, what more this name is made up
> >>>> from two topics names, it isn't temporary (not what name suggests) and
> >>>> to add insult to the injury its "getDestinationPaths()" method returns
> >>>> array with the following elements:
> >>>> "topic://ActiveMQ"
> >>>> "Advisory"
> >>>> "TempQueue,topic://ActiveMQ"
> >>>> "Advisory"
> >>>> "TempTopic"
> >>>>
> >>>> I wrote my plugin using
> >>>> "org.apache.activemq.security.AuthorizationBroker" as a base and aside
> >>>> from this and single problem with MQTT protocol it works fine.
> >>>>
> >>>> What is the reason for this topic's existence?
> >>>> Why is its name so weird?
> >>>> Should I expect more destinations with such names?
> >>>> Is it enough to check for exactly this name and I will be done? I did
> >>>> it and it works, but I can't be sure that it won't happen again with
> >>>> different weird topic name.
> >>>>
> >>>> Thanks in advance,
> >>>> --
> >>>> Jędrzej Dudkiewicz
> >>>>
> >>>> I really hate this damn machine, I wish that they would sell it.
> >>>> It never does just what I want, but only what I tell it.
> >>>>
> >
> >
>
> --
> Tim Bish
>


--
Jędrzej Dudkiewicz

I really hate this damn machine, I wish that they would sell it.
It never does just what I want, but only what I tell it.