2007年12月3日星期一

ActiveMq+Spring2.5配置

配置Broker:

    <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean" lazy-init="false">
        <property name="config" value="classpath:com/xued/techevaluate/mq/activemq.xml" />
        <property name="start" value="true" />
    </bean>

activemq.xml内容:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker xmlns="http://activemq.org/config/1.0" useJmx="false" persistent="false">
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61616"/>
    </transportConnectors>
   </broker>
 </beans>

配置ConnectionFactory:

    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory ">
        <property name="brokerURL" value="vm://localhost" />
        <property name="copyMessageOnSend" value="false" />
        <property name="objectMessageSerializationDefered" value="true" />
    </bean>
   
    <bean id="jmsTxManager" class="org.springframework.jms.connection.JmsTransactionManager" lazy-init="false">
        <property name="connectionFactory" ref="jmsFactory"/>
    </bean>

采用vm://localhost方式的ConnectionFactory用于同一个JVM内部的消费者

配置listener-container:

首先加入xml-schema,类似以下内容:
<beans xmlns=" http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance "
       xmlns:context="http://www.springframework.org/schema/context "
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-2.5.xsd
        http://www.springframework.org/schema/jms
       http://www.springframework.org/schema/jms/spring-jms-2.5.xsd " default-lazy-init="true">

加入listener-container内容:
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ">
        <property name="corePoolSize" value="5" />
        <property name="maxPoolSize" value="10" />
    </bean>
   
    <jms:listener-container connection-factory="jmsFactory" task-executor="taskExecutor" concurrency="10" acknowledge="transacted">
        <jms:listener destination="TEST.FOO" ref="messageReceiver" method="receive" />
    </jms:listener-container>

以上配置的含义是,监听名为TEST.FOO的queue,message收到后交给messageReceiver的bean处理,处理方法名为receive。

messageReceiver代码:

import java.io.Serializable;
import org.springframework.stereotype.Component;

@Component(value = "messageReceiver")
public class MessageReceiver {

    public void receive(String msg) {
        System.out.println(msg);
    }

    public void receive(Serializable msg) {
        TransferedObject obj = (TransferedObject) msg;
        System.out.println (obj.getMsg());
    }
}

客户端配置:

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://localhost:61616" />
            </bean>
        </property>
    </bean>
   
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory" />
    </bean>

使用PooledConnectionFactory的目的是避免JmsTemplate每次发送都创建一个新的连接。

客户端代码:

import com.xued.techevaluate.mq.TransferedObject;
import java.io.Serializable;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session ;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator ;
import org.springframework.stereotype.Component;

@Component(value = "sender")
public class jmsSender {
    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;
    public void sendString(String queueName, final String msg) {
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
    public void sendObject(String queueName, final Serializable obj) {
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage (obj);
            }
        });
    }
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("MqclientContext.xml");
        jmsSender sender = (jmsSender) ctx.getBean ("sender");
        for (int i = 0; i < 100; i++) {
            sender.sendObject("TEST.FOO", new TransferedObject("My test queue object " + i));
        }
        sender.sendString("TEST.FOO", "shutdown");
    }
}

没有评论: