|
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java Sun Mar 25 06:33:49 2012 @@ -14,30 +14,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.stomp; +package org.apache.activemq.transport.mqtt; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; - -import java.io.*; -import java.util.HashMap; -import java.util.Map; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.mqtt.codec.MQTTFrame; /** * Implements marshalling and unmarsalling the <a - * href="http://stomp.codehaus.org/">Stomp</a> protocol. + * href="http://mqtt.org/">MQTT</a> protocol. */ -public class StompWireFormat implements WireFormat { +public class MQTTWireFormat implements WireFormat { - private static final byte[] NO_DATA = new byte[] {}; - private static final byte[] END_OF_FRAME = new byte[] {0, '\n'}; - private static final int MAX_COMMAND_LENGTH = 1024; - private static final int MAX_HEADER_LENGTH = 1024 * 10; - private static final int MAX_HEADERS = 1000; - private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; + private static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256; private boolean encodingEnabled = false; private int version = 1; @@ -56,254 +55,70 @@ public class StompWireFormat implements return unmarshal(dis); } - public void marshal(Object command, DataOutput os) throws IOException { - StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command; - - if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) { - os.write(Stomp.BREAK); - return; - } - - StringBuilder buffer = new StringBuilder(); - buffer.append(stomp.getAction()); - buffer.append(Stomp.NEWLINE); - - // Output the headers. - for (Map.Entry<String, String> entry : stomp.getHeaders().entrySet()) { - buffer.append(entry.getKey()); - buffer.append(Stomp.Headers.SEPERATOR); - buffer.append(encodeHeader(entry.getValue())); - buffer.append(Stomp.NEWLINE); + public void marshal(Object command, DataOutput dataOut) throws IOException { + MQTTFrame frame = (MQTTFrame) command; + dataOut.write(frame.header()); + + int remaining = 0; + for (Buffer buffer : frame.buffers) { + remaining += buffer.length; + } + do { + byte digit = (byte) (remaining & 0x7F); + remaining >>>= 7; + if (remaining > 0) { + digit |= 0x80; + } + dataOut.write(digit); + } while (remaining > 0); + for (Buffer buffer : frame.buffers) { + dataOut.write(buffer.data, buffer.offset, buffer.length); } - - // Add a newline to seperate the headers from the content. - buffer.append(Stomp.NEWLINE); - - os.write(buffer.toString().getBytes("UTF-8")); - os.write(stomp.getContent()); - os.write(END_OF_FRAME); } - public Object unmarshal(DataInput in) throws IOException { - - try { - - // parse action - String action = parseAction(in); - - // Parse the headers - HashMap<String, String> headers = parseHeaders(in); - - // Read in the data part. - byte[] data = NO_DATA; - String contentLength = headers.get(Stomp.Headers.CONTENT_LENGTH); - if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLength != null) { - - // Bless the client, he's telling us how much data to read in. - int length = parseContentLength(contentLength); - - data = new byte[length]; - in.readFully(data); - - if (in.readByte() != 0) { - throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true); - } - + public Object unmarshal(DataInput dataIn) throws IOException { + byte header = dataIn.readByte(); + + byte digit = 0; + + int multiplier = 1; + int length = 0; + do { + digit = dataIn.readByte(); + length += (digit & 0x7F) * multiplier; + multiplier <<= 7; + } + while ((digit & 0x80) != 0); + if (length >= 0) { + if (length > MAX_MESSAGE_LENGTH) { + throw new IOException("The maximum message length was exceeded"); + } + + if (length > 0) { + byte[] data = new byte[length]; + dataIn.readFully(data); + Buffer body = new Buffer(data); + return new MQTTFrame(body).header(header); } else { - - // We don't know how much to read.. data ends when we hit a 0 - byte b; - ByteArrayOutputStream baos = null; - while ((b = in.readByte()) != 0) { - - if (baos == null) { - baos = new ByteArrayOutputStream(); - } else if (baos.size() > MAX_DATA_LENGTH) { - throw new ProtocolException("The maximum data length was exceeded", true); - } - - baos.write(b); - } - - if (baos != null) { - baos.close(); - data = baos.toByteArray(); - } + return new MQTTFrame().header(header); } - - return new StompFrame(action, headers, data); - - } catch (ProtocolException e) { - return new StompFrameError(e); } + return null; } - private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException { - ByteSequence sequence = readHeaderLine(in, maxLength, errorMessage); - return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8").trim(); - } - - private ByteSequence readHeaderLine(DataInput in, int maxLength, String errorMessage) throws IOException { - byte b; - ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength); - while ((b = in.readByte()) != '\n') { - if (baos.size() > maxLength) { - throw new ProtocolException(errorMessage, true); - } - baos.write(b); - } - baos.close(); - return baos.toByteSequence(); - } - - protected String parseAction(DataInput in) throws IOException { - String action = null; - - // skip white space to next real action line - while (true) { - action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); - if (action == null) { - throw new IOException("connection was closed"); - } else { - action = action.trim(); - if (action.length() > 0) { - break; - } - } - } - return action; - } - - protected HashMap<String, String> parseHeaders(DataInput in) throws IOException { - HashMap<String, String> headers = new HashMap<String, String>(25); - while (true) { - ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); - if (line != null && line.length > 1) { - - if (headers.size() > MAX_HEADERS) { - throw new ProtocolException("The maximum number of headers was exceeded", true); - } - - try { - - ByteArrayInputStream headerLine = new ByteArrayInputStream(line); - ByteArrayOutputStream stream = new ByteArrayOutputStream(line.length); - - // First complete the name - int result = -1; - while ((result = headerLine.read()) != -1) { - if (result != ':') { - stream.write(result); - } else { - break; - } - } - - ByteSequence nameSeq = stream.toByteSequence(); - String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8").trim(); - String value = decodeHeader(headerLine).trim(); - headers.put(name, value); - } catch (Exception e) { - throw new ProtocolException("Unable to parser header line [" + line + "]", true); - } - } else { - break; - } - } - return headers; - } - - protected int parseContentLength(String contentLength) throws ProtocolException { - int length; - try { - length = Integer.parseInt(contentLength.trim()); - } catch (NumberFormatException e) { - throw new ProtocolException("Specified content-length is not a valid integer", true); - } - - if (length > MAX_DATA_LENGTH) { - throw new ProtocolException("The maximum data length was exceeded", true); - } - - return length; - } - - private String encodeHeader(String header) throws IOException { - String result = header; - if (this.encodingEnabled) { - byte[] utf8buf = header.getBytes("UTF-8"); - ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length); - for(byte val : utf8buf) { - switch(val) { - case Stomp.ESCAPE: - stream.write(Stomp.ESCAPE_ESCAPE_SEQ); - break; - case Stomp.BREAK: - stream.write(Stomp.NEWLINE_ESCAPE_SEQ); - break; - case Stomp.COLON: - stream.write(Stomp.COLON_ESCAPE_SEQ); - break; - default: - stream.write(val); - } - } - result = new String(stream.toByteArray(), "UTF-8"); - } - - return result; - } - - private String decodeHeader(InputStream header) throws IOException { - ByteArrayOutputStream decoded = new ByteArrayOutputStream(); - PushbackInputStream stream = new PushbackInputStream(header); - - int value = -1; - while( (value = stream.read()) != -1) { - if (value == 92) { - - int next = stream.read(); - if (next != -1) { - switch(next) { - case 110: - decoded.write(Stomp.BREAK); - break; - case 99: - decoded.write(Stomp.COLON); - break; - case 92: - decoded.write(Stomp.ESCAPE); - break; - default: - stream.unread(next); - decoded.write(value); - } - } else { - decoded.write(value); - } - - } else { - decoded.write(value); - } - } - - return new String(decoded.toByteArray(), "UTF-8"); - } - - public int getVersion() { - return version; - } - + /** + * @param the version of the wire format + */ public void setVersion(int version) { this.version = version; } - public boolean isEncodingEnabled() { - return this.encodingEnabled; + /** + * @return the version of the wire format + */ + public int getVersion() { + return this.version; } - public void setEncodingEnabled(boolean value) { - this.encodingEnabled = value; - } } Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java Sun Mar 25 06:33:49 2012 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.stomp; +package org.apache.activemq.transport.mqtt; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; @@ -22,8 +22,8 @@ import org.apache.activemq.wireformat.Wi /** * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol. */ -public class StompWireFormatFactory implements WireFormatFactory { +public class MQTTWireFormatFactory implements WireFormatFactory { public WireFormat createWireFormat() { - return new StompWireFormat(); + return new MQTTWireFormat(); } } Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java Sun Mar 25 06:33:49 2012 @@ -14,17 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.stomp; +package org.apache.activemq.transport.mqtt; import java.io.IOException; import org.apache.activemq.command.Response; + /** - * Interface used by the ProtocolConverter for callbacks. - * - * @author <a href="http://hiramchirino.com">chirino</a> + * Interface used by the MQTTProtocolConverter for callbacks. */ interface ResponseHandler { - void onResponse(ProtocolConverter converter, Response response) throws IOException; + void onResponse(MQTTProtocolConverter converter, Response response) throws IOException; } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java?rev=1304984&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java Sun Mar 25 06:33:49 2012 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +public class WildCardConvertor { + + static String convertActiveMQToMQTT(String name) { + String result = name.replaceAll("#", ">"); + result = result.replaceAll("+", "*"); + result = result.replaceAll("/", "."); + return result; + } + + static String convertMQTTToActiveMQ(String name) { + String result = name.replaceAll(">", "#"); + result = result.replaceAll("*", "+"); + result = result.replaceAll(".", "/"); + return result; + } +} Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html Sun Mar 25 06:33:49 2012 @@ -19,8 +19,7 @@ </head> <body> -An implementation of the Stomp protocol which is a simple wire protocol for writing clients for ActiveMQ in different -languages like Ruby, Python, PHP, C etc. +An implementation of the MQTT 3.1 protocol - see http://mqtt.org/ </body> </html> Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt (from r1303689, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp (original) +++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt Sun Mar 25 06:33:49 2012 @@ -14,4 +14,4 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -class=org.apache.activemq.transport.stomp.StompTransportFactory +class=org.apache.activemq.transport.mqtt.MQTTTransportFactory Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt (from r1303689, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp (original) +++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt Sun Mar 25 06:33:49 2012 @@ -14,4 +14,4 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -class=org.apache.activemq.transport.stomp.StompWireFormatFactory \ No newline at end of file +class=org.apache.activemq.transport.mqtt.MQTTWireFormatFactory \ No newline at end of file Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java (from r1303689, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java Sun Mar 25 06:33:49 2012 @@ -14,29 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.stomp; +package org.apache.activemq.transport.mqtt; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.util.Vector; -import javax.net.ServerSocketFactory; -import org.apache.activemq.broker.BrokerPlugin; + import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.security.JaasDualAuthenticationPlugin; -import org.apache.activemq.util.Wait; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static junit.framework.Assert.assertTrue; - // https://issues.apache.org/jira/browse/AMQ-3393 -public class ConnectTest { - private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class); +public class MQTTConnectTest { + private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class); BrokerService brokerService; Vector<Throwable> exceptions = new Vector<Throwable>(); @@ -56,123 +49,18 @@ public class ConnectTest { } @Test - public void testStompConnectLeak() throws Exception { + public void testConnect() throws Exception { - brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0"); + brokerService.addConnector("mqtt://localhost:1883"); brokerService.start(); - - Thread t1 = new Thread() { - StompConnection connection = new StompConnection(); - - public void run() { - try { - connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort()); - connection.connect("system", "manager"); - connection.disconnect(); - } catch (Exception ex) { - LOG.error("unexpected exception on connect/disconnect", ex); - exceptions.add(ex); - } - } - }; - - int i = 0; - long done = System.currentTimeMillis() + (15 * 1000); - while (System.currentTimeMillis() < done) { - t1.run(); - if (++i % 5000 == 0) { - LOG.info("connection count on stomp connector:" + brokerService.getTransportConnectors().get(0).connectionCount()); - } - } - - assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 0 == brokerService.getTransportConnectors().get(0).connectionCount(); - } - })); - assertTrue("no exceptions", exceptions.isEmpty()); + MQTT mqtt = new MQTT(); + mqtt.setHost("localhost",1883); + BlockingConnection connection = mqtt.blockingConnection(); + + connection.connect(); + Thread.sleep(1000); + connection.disconnect(); } - @Test - public void testJaasDualStopWithOpenConnection() throws Exception { - - brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()}); - brokerService.addConnector("stomp://0.0.0.0:0?transport.closeAsync=false"); - brokerService.start(); - - final int listenPort = brokerService.getTransportConnectors().get(0).getConnectUri().getPort(); - Thread t1 = new Thread() { - StompConnection connection = new StompConnection(); - - public void run() { - try { - connection.open("localhost", listenPort); - connection.connect("system", "manager"); - } catch (Exception ex) { - LOG.error("unexpected exception on connect/disconnect", ex); - exceptions.add(ex); - } - } - }; - - t1.run(); - - assertTrue("one connection", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); - } - })); - - brokerService.stop(); - - // server socket should be available after stop - ServerSocket socket = ServerSocketFactory.getDefault().createServerSocket(); - socket.setReuseAddress(true); - InetAddress address = InetAddress.getLocalHost(); - socket.bind(new InetSocketAddress(address, listenPort)); - LOG.info("bound address: " + socket); - socket.close(); - assertTrue("no exceptions", exceptions.isEmpty()); - } - - @Test - public void testInactivityMonitor() throws Exception { - - brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false"); - brokerService.start(); - - Thread t1 = new Thread() { - StompConnection connection = new StompConnection(); - - public void run() { - try { - connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort()); - connection.connect("system", "manager"); - } catch (Exception ex) { - LOG.error("unexpected exception on connect/disconnect", ex); - exceptions.add(ex); - } - } - }; - - t1.run(); - - assertTrue("one connection", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 1 == brokerService.getTransportConnectors().get(0).connectionCount(); - } - })); - - // and it should be closed due to inactivity - assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 0 == brokerService.getTransportConnectors().get(0).connectionCount(); - } - })); - assertTrue("no exceptions", exceptions.isEmpty()); - } + } \ No newline at end of file Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (from r1303689, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java&r1=1303689&r2=1304984&rev=1304984&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Sun Mar 25 06:33:49 2012 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.stomp; +package org.apache.activemq.transport.mqtt; import java.io.IOException; import java.net.Socket; @@ -41,7 +41,6 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; @@ -50,14 +49,18 @@ import org.apache.activemq.broker.jmx.Br import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.transport.stomp.SamplePojo; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.transport.stomp.StompFrame; import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StompTest extends CombinationTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(StompTest.class); +public class MQTTTest extends CombinationTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); - protected String bindAddress = "stomp://localhost:61613"; + protected String bindAddress = "mqtt://localhost:1883"; protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; protected String jmsUri = "vm://localhost"; |
| Powered by Nabble | Edit this page |
