Unrecognized Data Type: 51 JDBCPersistenceAdapter getLastMessageBrokerSequenceId

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Unrecognized Data Type: 51 JDBCPersistenceAdapter getLastMessageBrokerSequenceId

hseshadr
This post has NOT been accepted by the mailing list yet.
I am using Active MQ 5.5.1 with spring 3.1 in embedded mode and am experiencing errors.
I am able to send messages to the queue the first time but then it starts failing with the following exception:

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name
'messageBrokerService' defined in class path resource [gr-service_applicationContext.xml]: Cannot resolve reference to
bean 'messageBroker' while setting bean property 'messageBroker'; nested exception is
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'messageBroker' defined in class
path resource [gr-service_applicationContext.xml]: Invocation of init method failed; nested exception is
org.springframework.beans.factory.BeanCreationException: Error creating bean with name
'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]:
Invocation of init method failed; nested exception is java.io.IOException: Unknown data type: 51

Caused by: java.io.IOException: Unknown data type: 51
        at org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:356)
        at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:204)
        at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.getLastMessageBrokerSequenceId(JDBCPersistenceAdapter.java:236)
        at org.apache.activemq.broker.region.DestinationFactoryImpl.getLastMessageBrokerSequenceId(DestinationFactoryImpl.java:145)
       

This error happens when I use JDBC persistence with postgres. However when the persistence adapter is kahadb there are no issues.
The first time around it creates the acitivemq tables in the DB and populates the messages in the activemq_msgs table. The second time
around it simply fails looking for the last message broker sequence id.

Here are the following test artifacts:

[activemq.xml]



<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

 
  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false">


    <persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#dataSource"/>
    </persistenceAdapter>
     
    <transportConnectors>
      <transportConnector uri="tcp://127.0.0.1:61638"/>     
    </transportConnectors>     
  </broker>

   
   
   
   
    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName">
            <value>org.postgresql.Driver</value>
        </property>
        <property name="url">
            <value>jdbc:postgresql://localhost/jmstest</value>
        </property>
        <property name="username">
            <value>jmstest</value>
        </property>
        <property name="password">
            <value>jmstest</value>
        </property>
        <property name="initialSize" value="1"/>
                <property name="maxActive" value="10"/>
                <property name="poolPreparedStatements" value="true"/>
    </bean>
 
</beans>




[messageBroker_applicationContext.xml]

<beans
  xmlns="http://www.springframework.org/schema/beans" 
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
   shttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
  <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
    <property name="config" value="classpath:activemq.xml" />
    <property name="start" value="true" />
  </bean>

</beans>

[messageListener_applicationContext.xml]

<beans
  xmlns="http://www.springframework.org/schema/beans" 
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://activemq.apache.org/schema/core 
       http://activemq.apache.org/schema/core/activemq-core.xsd">
 
  <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
    <property name="config" value="classpath:activemq.xml" />
    <property name="staart" value="true" />
  </bean>

  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61638"/>
  </bean>
 
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestinationName" value="defaultQueue" />
        <property name="pubSubDomain" value="false" />
    </bean>
       
        <bean id="messageListenerTest" class="MessageListenerTest">
                <property name="jmsTemplate" ref="jmsTemplate" />
        </bean>

</beans>

[messageSender_applicationContext.xml]

<beans
  xmlns="http://www.springframework.org/schema/beans" 
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
   shttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
  <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
    <property name="config" value="classpath:activemq.xml" />
    <property name="start" value="true" />
  </bean>

  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61638"/>
  </bean>
 
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestinationName" value="defaultQueue" />
        <property name="pubSubDomain" value="false" />
    </bean>

        <bean id="messageSenderTest" class="MessageSenderTest">
                <property name="jmsTemplate" ref="jmsTemplate" />
        </bean>

</beans>

[JAVA CLASSES]

import org.apache.activemq.xbean.XBeanBrokerService;
import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.core.io.ClassPathResource;


public class StartMessageBroker {

        public static void main(String[] args) throws InterruptedException
        {
                XmlBeanFactory beanFactory = new XmlBeanFactory(new ClassPathResource("messageBroker_applicationContext.xml"));
                XBeanBrokerService broker = (XBeanBrokerService) beanFactory.getBean("broker");
                System.out.println("STARTED the broker");
               
                Object lock = new Object();
                synchronized (lock)
                {
                        lock.wait();
                }
        }
}



import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jms.core.JmsTemplate;

public class MessageListenerTest implements MessageListener {

        private JmsTemplate jmsTemplate;
       
       
    public JmsTemplate getJmsTemplate() {
                return jmsTemplate;
        }


        public void setJmsTemplate(JmsTemplate jmsTemplate) {
                this.jmsTemplate = jmsTemplate;
        }


        public void onMessage(Message message)
    {
        try
        {
            System.out.println(((TextMessage) message).getText());
        }
        catch (JMSException ex)
        {
            throw new RuntimeException(ex);
        }
    }
       
        public static void main(String[] args) throws InterruptedException
        {
                XmlBeanFactory beanFactory = new XmlBeanFactory(new ClassPathResource("messageListener_applicationContext.xml"));
                MessageListenerTest messageListenerTest = (MessageListenerTest) beanFactory.getBean("messageListenerTest");
                int receivedMessages = 0;
                System.out.println("Started Message Listener");
                while (true)
                {
                        Message message = messageListenerTest.getJmsTemplate().receive(messageListenerTest.getJmsTemplate().
                                                                                                                        getDefaultDestinationName());
                        receivedMessages++;
                        //System.out.println("message: " + message);
                        if (receivedMessages%100 == 0)
                        {
                                System.out.println("Received " + receivedMessages + " messages");
                        }
                }
               
        }
}

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class MessageSenderTest {

    private JmsTemplate jmsTemplate;
    private Queue queue;

   

    public JmsTemplate getJmsTemplate() {
                return jmsTemplate;
        }

        public void setJmsTemplate(JmsTemplate jmsTemplate) {
                this.jmsTemplate = jmsTemplate;
        }

        public Queue getQueue() {
                return queue;
        }

        public void setQueue(Queue queue) {
                this.queue = queue;
        }

        public void simpleSend(final String pMessage)
    {
        getJmsTemplate().send(new MessageCreator()
        {
            public Message createMessage(Session session) throws JMSException
            {
              return session.createTextMessage(pMessage);
            }
        });
    }
   
        public static void main(String[] args) throws InterruptedException
        {
                XmlBeanFactory beanFactory = new XmlBeanFactory(new ClassPathResource("messageSender_applicationContext.xml"));
                MessageSenderTest messageSenderTest = (MessageSenderTest) beanFactory.getBean("messageSenderTest");
               
                System.out.println("Started Message Sender");
               
                int numItems = 1000000;
                System.out.println("Sending " + numItems + " messages ...");
                for (int i=0; i < numItems; i++)
                {
                        messageSenderTest.simpleSend("Message-" + i);
                        if (0 != i && i%1000 == 0)
                        {
                                System.out.println("Sent " + i + " messages");
                        }
                }
                System.out.println("Done sending " + numItems + " messages");
        }
}