配置Broker:
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean" lazy-init="false">
<property name="config" value="classpath:com/xued/techevaluate/mq/activemq.xml" />
<property name="start" value="true" />
</bean>
activemq.xml内容:
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker xmlns="http://activemq.org/config/1.0" useJmx="false" persistent="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>
配置ConnectionFactory:
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory ">
<property name="brokerURL" value="vm://localhost" />
<property name="copyMessageOnSend" value="false" />
<property name="objectMessageSerializationDefered" value="true" />
</bean>
<bean id="jmsTxManager" class="org.springframework.jms.connection.JmsTransactionManager" lazy-init="false">
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
采用vm://localhost方式的ConnectionFactory用于同一个JVM内部的消费者
配置listener-container:
首先加入xml-schema,类似以下内容:
<beans xmlns=" http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance "
xmlns:context="http://www.springframework.org/schema/context "
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-2.5.xsd " default-lazy-init="true">
加入listener-container内容:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
</bean>
<jms:listener-container connection-factory="jmsFactory" task-executor="taskExecutor" concurrency="10" acknowledge="transacted">
<jms:listener destination="TEST.FOO" ref="messageReceiver" method="receive" />
</jms:listener-container>
以上配置的含义是,监听名为TEST.FOO的queue,message收到后交给messageReceiver的bean处理,处理方法名为receive。
messageReceiver代码:
import java.io.Serializable;
import org.springframework.stereotype.Component;
@Component(value = "messageReceiver")
public class MessageReceiver {
public void receive(String msg) {
System.out.println(msg);
}
public void receive(Serializable msg) {
TransferedObject obj = (TransferedObject) msg;
System.out.println (obj.getMsg());
}
}
客户端配置:
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
</bean>
使用PooledConnectionFactory的目的是避免JmsTemplate每次发送都创建一个新的连接。
客户端代码:
import com.xued.techevaluate.mq.TransferedObject;
import java.io.Serializable;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session ;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator ;
import org.springframework.stereotype.Component;
@Component(value = "sender")
public class jmsSender {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
public void sendString(String queueName, final String msg) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
public void sendObject(String queueName, final Serializable obj) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage (obj);
}
});
}
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("MqclientContext.xml");
jmsSender sender = (jmsSender) ctx.getBean ("sender");
for (int i = 0; i < 100; i++) {
sender.sendObject("TEST.FOO", new TransferedObject("My test queue object " + i));
}
sender.sendString("TEST.FOO", "shutdown");
}
}
没有评论:
发表评论