2007年12月5日星期三

线程安全的读优化Map实现

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/**
 * 本技术适用于共享数据很少更改且由多线程同时访问的场合。不过该技术仅适用于应用程序不要求使用绝对最新数据的场合。
 * 最终结果是并发访问随时间变化的共享数据。在要求高并发性的环境中。该技术可以避免在应用程序内部包含不必要的排队点。
 * 仅用于 Java 5.0 及更高版本。
 * @author xued
 */
public class ReadNoLockingMap {

    // 必须设为volatile类型以确保消费线程可以得到更新后的值
    private volatile Map currentMap;
    private Object lockbox;

    public ReadNoLockingMap() {
        this.currentMap = new HashMap();
        this.lockbox = new Object();
    }

    public void putNewMap(Object key, Object value) {
        synchronized (lockbox) {
            Map newMap = new HashMap(currentMap);
            newMap.put(key, value);
            currentMap = newMap;
        }
    }

    public void putNewMap(Map t) {
        synchronized (lockbox) {
            Map newMap = new HashMap(currentMap);
            newMap.putAll (t);
            currentMap = newMap;
        }
    }

    public Object getFromCurrentMap(Object key) {  // Called by consumer threads.
        Map m = currentMap; // No locking around this is required.
        Object result = m.get(key);     // get on a HashMap is not synchronized.
        return (result); // Do any additional processing needed using the result.
    }

    public static void main(String[] args) {
        final ReadNoLockingMap nlMap = new ReadNoLockingMap();
        nlMap.putNewMap("key", "Starting...");
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                        public void run() {
                            int value = new Random().nextInt();
                            nlMap.putNewMap("key", value);
                            System.out.println(Thread.currentThread().getName() + ": write map " + value);
                        }
                    }).start();
        }

        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                    public void run() {
                            System.out.println(Thread.currentThread().getName() + ": " + nlMap.getFromCurrentMap("key"));
                    }
                }).start();
        }
    }
}

2007年12月3日星期一

ActiveMq+Spring2.5配置

配置Broker:

    <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean" lazy-init="false">
        <property name="config" value="classpath:com/xued/techevaluate/mq/activemq.xml" />
        <property name="start" value="true" />
    </bean>

activemq.xml内容:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker xmlns="http://activemq.org/config/1.0" useJmx="false" persistent="false">
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61616"/>
    </transportConnectors>
   </broker>
 </beans>

配置ConnectionFactory:

    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory ">
        <property name="brokerURL" value="vm://localhost" />
        <property name="copyMessageOnSend" value="false" />
        <property name="objectMessageSerializationDefered" value="true" />
    </bean>
   
    <bean id="jmsTxManager" class="org.springframework.jms.connection.JmsTransactionManager" lazy-init="false">
        <property name="connectionFactory" ref="jmsFactory"/>
    </bean>

采用vm://localhost方式的ConnectionFactory用于同一个JVM内部的消费者

配置listener-container:

首先加入xml-schema,类似以下内容:
<beans xmlns=" http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance "
       xmlns:context="http://www.springframework.org/schema/context "
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-2.5.xsd
        http://www.springframework.org/schema/jms
       http://www.springframework.org/schema/jms/spring-jms-2.5.xsd " default-lazy-init="true">

加入listener-container内容:
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ">
        <property name="corePoolSize" value="5" />
        <property name="maxPoolSize" value="10" />
    </bean>
   
    <jms:listener-container connection-factory="jmsFactory" task-executor="taskExecutor" concurrency="10" acknowledge="transacted">
        <jms:listener destination="TEST.FOO" ref="messageReceiver" method="receive" />
    </jms:listener-container>

以上配置的含义是,监听名为TEST.FOO的queue,message收到后交给messageReceiver的bean处理,处理方法名为receive。

messageReceiver代码:

import java.io.Serializable;
import org.springframework.stereotype.Component;

@Component(value = "messageReceiver")
public class MessageReceiver {

    public void receive(String msg) {
        System.out.println(msg);
    }

    public void receive(Serializable msg) {
        TransferedObject obj = (TransferedObject) msg;
        System.out.println (obj.getMsg());
    }
}

客户端配置:

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://localhost:61616" />
            </bean>
        </property>
    </bean>
   
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory" />
    </bean>

使用PooledConnectionFactory的目的是避免JmsTemplate每次发送都创建一个新的连接。

客户端代码:

import com.xued.techevaluate.mq.TransferedObject;
import java.io.Serializable;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session ;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator ;
import org.springframework.stereotype.Component;

@Component(value = "sender")
public class jmsSender {
    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;
    public void sendString(String queueName, final String msg) {
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
    public void sendObject(String queueName, final Serializable obj) {
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage (obj);
            }
        });
    }
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("MqclientContext.xml");
        jmsSender sender = (jmsSender) ctx.getBean ("sender");
        for (int i = 0; i < 100; i++) {
            sender.sendObject("TEST.FOO", new TransferedObject("My test queue object " + i));
        }
        sender.sendString("TEST.FOO", "shutdown");
    }
}