import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* 本技术适用于共享数据很少更改且由多线程同时访问的场合。不过该技术仅适用于应用程序不要求使用绝对最新数据的场合。
* 最终结果是并发访问随时间变化的共享数据。在要求高并发性的环境中。该技术可以避免在应用程序内部包含不必要的排队点。
* 仅用于 Java 5.0 及更高版本。
* @author xued
*/
public class ReadNoLockingMap {
// 必须设为volatile类型以确保消费线程可以得到更新后的值
private volatile Map currentMap;
private Object lockbox;
public ReadNoLockingMap() {
this.currentMap = new HashMap();
this.lockbox = new Object();
}
public void putNewMap(Object key, Object value) {
synchronized (lockbox) {
Map newMap = new HashMap(currentMap);
newMap.put(key, value);
currentMap = newMap;
}
}
public void putNewMap(Map t) {
synchronized (lockbox) {
Map newMap = new HashMap(currentMap);
newMap.putAll (t);
currentMap = newMap;
}
}
public Object getFromCurrentMap(Object key) { // Called by consumer threads.
Map m = currentMap; // No locking around this is required.
Object result = m.get(key); // get on a HashMap is not synchronized.
return (result); // Do any additional processing needed using the result.
}
public static void main(String[] args) {
final ReadNoLockingMap nlMap = new ReadNoLockingMap();
nlMap.putNewMap("key", "Starting...");
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
public void run() {
int value = new Random().nextInt();
nlMap.putNewMap("key", value);
System.out.println(Thread.currentThread().getName() + ": write map " + value);
}
}).start();
}
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + ": " + nlMap.getFromCurrentMap("key"));
}
}).start();
}
}
}
2007年12月5日星期三
2007年12月3日星期一
ActiveMq+Spring2.5配置
配置Broker:
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean" lazy-init="false">
<property name="config" value="classpath:com/xued/techevaluate/mq/activemq.xml" />
<property name="start" value="true" />
</bean>
activemq.xml内容:
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker xmlns="http://activemq.org/config/1.0" useJmx="false" persistent="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>
配置ConnectionFactory:
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory ">
<property name="brokerURL" value="vm://localhost" />
<property name="copyMessageOnSend" value="false" />
<property name="objectMessageSerializationDefered" value="true" />
</bean>
<bean id="jmsTxManager" class="org.springframework.jms.connection.JmsTransactionManager" lazy-init="false">
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
采用vm://localhost方式的ConnectionFactory用于同一个JVM内部的消费者
配置listener-container:
首先加入xml-schema,类似以下内容:
<beans xmlns=" http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance "
xmlns:context="http://www.springframework.org/schema/context "
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-2.5.xsd " default-lazy-init="true">
加入listener-container内容:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
</bean>
<jms:listener-container connection-factory="jmsFactory" task-executor="taskExecutor" concurrency="10" acknowledge="transacted">
<jms:listener destination="TEST.FOO" ref="messageReceiver" method="receive" />
</jms:listener-container>
以上配置的含义是,监听名为TEST.FOO的queue,message收到后交给messageReceiver的bean处理,处理方法名为receive。
messageReceiver代码:
import java.io.Serializable;
import org.springframework.stereotype.Component;
@Component(value = "messageReceiver")
public class MessageReceiver {
public void receive(String msg) {
System.out.println(msg);
}
public void receive(Serializable msg) {
TransferedObject obj = (TransferedObject) msg;
System.out.println (obj.getMsg());
}
}
客户端配置:
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
</bean>
使用PooledConnectionFactory的目的是避免JmsTemplate每次发送都创建一个新的连接。
客户端代码:
import com.xued.techevaluate.mq.TransferedObject;
import java.io.Serializable;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session ;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator ;
import org.springframework.stereotype.Component;
@Component(value = "sender")
public class jmsSender {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
public void sendString(String queueName, final String msg) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
public void sendObject(String queueName, final Serializable obj) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage (obj);
}
});
}
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("MqclientContext.xml");
jmsSender sender = (jmsSender) ctx.getBean ("sender");
for (int i = 0; i < 100; i++) {
sender.sendObject("TEST.FOO", new TransferedObject("My test queue object " + i));
}
sender.sendString("TEST.FOO", "shutdown");
}
}
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean" lazy-init="false">
<property name="config" value="classpath:com/xued/techevaluate/mq/activemq.xml" />
<property name="start" value="true" />
</bean>
activemq.xml内容:
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker xmlns="http://activemq.org/config/1.0" useJmx="false" persistent="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>
配置ConnectionFactory:
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory ">
<property name="brokerURL" value="vm://localhost" />
<property name="copyMessageOnSend" value="false" />
<property name="objectMessageSerializationDefered" value="true" />
</bean>
<bean id="jmsTxManager" class="org.springframework.jms.connection.JmsTransactionManager" lazy-init="false">
<property name="connectionFactory" ref="jmsFactory"/>
</bean>
采用vm://localhost方式的ConnectionFactory用于同一个JVM内部的消费者
配置listener-container:
首先加入xml-schema,类似以下内容:
<beans xmlns=" http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance "
xmlns:context="http://www.springframework.org/schema/context "
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-2.5.xsd " default-lazy-init="true">
加入listener-container内容:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
</bean>
<jms:listener-container connection-factory="jmsFactory" task-executor="taskExecutor" concurrency="10" acknowledge="transacted">
<jms:listener destination="TEST.FOO" ref="messageReceiver" method="receive" />
</jms:listener-container>
以上配置的含义是,监听名为TEST.FOO的queue,message收到后交给messageReceiver的bean处理,处理方法名为receive。
messageReceiver代码:
import java.io.Serializable;
import org.springframework.stereotype.Component;
@Component(value = "messageReceiver")
public class MessageReceiver {
public void receive(String msg) {
System.out.println(msg);
}
public void receive(Serializable msg) {
TransferedObject obj = (TransferedObject) msg;
System.out.println (obj.getMsg());
}
}
客户端配置:
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
</bean>
使用PooledConnectionFactory的目的是避免JmsTemplate每次发送都创建一个新的连接。
客户端代码:
import com.xued.techevaluate.mq.TransferedObject;
import java.io.Serializable;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session ;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator ;
import org.springframework.stereotype.Component;
@Component(value = "sender")
public class jmsSender {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
public void sendString(String queueName, final String msg) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
public void sendObject(String queueName, final Serializable obj) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage (obj);
}
});
}
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("MqclientContext.xml");
jmsSender sender = (jmsSender) ctx.getBean ("sender");
for (int i = 0; i < 100; i++) {
sender.sendObject("TEST.FOO", new TransferedObject("My test queue object " + i));
}
sender.sendString("TEST.FOO", "shutdown");
}
}
订阅:
评论 (Atom)