MQTT To JMS: Payload (de)compression

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

MQTT To JMS: Payload (de)compression

davyv
I'm working on a project where I connect clients over MQTT and subscribers over JMS to an activeMQ instance.

For saving data (Wireless 3G) over the MQTT connection, I am compressing the data in the MQTT client. And now I'm trying to figure out a way to decompress the data.
I was looking to create a custom decompression interceptor to deploy into the activeMQ instance, however I ran into some issues. I override the 'send' method from the BrokerFilter. And try to replace the content with a decompressed bytearray.

BytesMessage bytesMessage = (BytesMessage) message; // Also tried a copy of the message

ByteArrayOutputStream oStream = new ByteArrayOutputStream(BUFFER_CAPACITY_BYTES);
byte[] buffer = new byte[BUFFER_CAPACITY_BYTES];

int bufferCount = -1;
while ((bufferCount = bytesMessage.readBytes(buffer)) >= 0) {
	oStream.write(buffer, 0, bufferCount);
	if (bufferCount < BUFFER_CAPACITY_BYTES) {
		break;
	}
}
byte[] content = oStream.toByteArray();
oStream.close();

byte[] unzipped = CompressionUtil.decompress(content); // My Utilityclass to decompress a byte array

bytesMessage.clearBody(); // When replacing the content of a text message, this helped avoiding the read-only exception
bytesMessage.writeBytes(unzipped);
And the activeMQ configuration:
<plugins>
	<!-- GZip Interceptor -->
	<bean xmlns="http://www.springframework.org/schema/beans" id="gzipInterceptor" class="nl.engiedigital.sds.sandbox.gzipInterceptor.gzipInterceptorPlugin"/>    
</plugins>


I keep getting the exception that the message (BytesMessage) body is write-only, so I'm unable to read the content ('bytesMessage.readBytes(buffer)').

Is an interceptor in activeMQ the best option for this? And how would this best be implemented? Does activeMQ have anything else for this? Or is the JMS client a better option?

If a use an interceptor? When exactly is this executed? Are there charts that show the flow of an interceptor? What happens when implementing more than one interceptor? What is the order that they are called?

As far as I can find in the specs, MQTT does not have an option in the protocol to compress the connection. However if anyone knows a standard option for this.
Reply | Threaded
Open this post in threaded view
|

Re: MQTT To JMS: Payload (de)compression

chirino
One way to do it would be to copy the message.  The body on the message
copy should be writable.

On Wednesday, February 24, 2016, davyv <[hidden email]>
wrote:

> I'm working on a project where I connect clients over MQTT and subscribers
> over JMS to an activeMQ instance.
>
> For saving data (Wireless 3G) over the MQTT connection, I am compressing
> the
> data in the MQTT client. And now I'm trying to figure out a way to
> decompress the data.
> I was looking to create a custom decompression interceptor to deploy into
> the activeMQ instance, however I ran into some issues. I override the
> 'send'
> method from the BrokerFilter. And try to replace the content with a
> decompressed bytearray.
>
>
> And the activeMQ configuration:
>
>
>
> I keep getting the exception that the message (BytesMessage) body is
> write-only, so I'm unable to read the content
> ('bytesMessage.readBytes(buffer)').
>
> Is an interceptor in activeMQ the best option for this? And how would this
> best be implemented? Does activeMQ have anything else for this? Or is the
> JMS client a better option?
>
> If a use an interceptor? When exactly is this executed? Are there charts
> that show the flow of an interceptor? What happens when implementing more
> than one interceptor? What is the order that they are called?
>
> As far as I can find in the specs, MQTT does not have an option in the
> protocol to compress the connection. However if anyone knows a standard
> option for this.
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/MQTT-To-JMS-Payload-de-compression-tp4708209.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>


--
Hiram Chirino
Engineering | Red Hat, Inc.
[hidden email] | fusesource.com | redhat.com
skype: hiramchirino | twitter: @hiramchirino
Reply | Threaded
Open this post in threaded view
|

Re: MQTT To JMS: Payload (de)compression

davyv
Do you mean by calling 'message.copy()'? I tried that, but without success. I get a  MessageNotReadableException because the message is in write-only mode instead of read-only. But your comment made me think and I was able to solve the write-only issue, by doing a 'message.reset()' on the ByteMessage. Stupid that I did not see that before, this is descriped in the apidocs.

However the question is still whether this is the correct way to decompress the message. Where in the process will the interceptor be inserted? Or is this not how interceptors are intended? If not, is there an other way to do this in activeMQ?
chirino wrote
One way to do it would be to copy the message.  The body on the message
copy should be writable.

On Wednesday, February 24, 2016, davyv <[hidden email]>
wrote:

> I'm working on a project where I connect clients over MQTT and subscribers
> over JMS to an activeMQ instance.
>
> For saving data (Wireless 3G) over the MQTT connection, I am compressing
> the
> data in the MQTT client. And now I'm trying to figure out a way to
> decompress the data.
> I was looking to create a custom decompression interceptor to deploy into
> the activeMQ instance, however I ran into some issues. I override the
> 'send'
> method from the BrokerFilter. And try to replace the content with a
> decompressed bytearray.
>
>
> And the activeMQ configuration:
>
>
>
> I keep getting the exception that the message (BytesMessage) body is
> write-only, so I'm unable to read the content
> ('bytesMessage.readBytes(buffer)').
>
> Is an interceptor in activeMQ the best option for this? And how would this
> best be implemented? Does activeMQ have anything else for this? Or is the
> JMS client a better option?
>
> If a use an interceptor? When exactly is this executed? Are there charts
> that show the flow of an interceptor? What happens when implementing more
> than one interceptor? What is the order that they are called?
>
> As far as I can find in the specs, MQTT does not have an option in the
> protocol to compress the connection. However if anyone knows a standard
> option for this.
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/MQTT-To-JMS-Payload-de-compression-tp4708209.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>


--
Hiram Chirino
Engineering | Red Hat, Inc.
[hidden email] | fusesource.com | redhat.com
skype: hiramchirino | twitter: @hiramchirino