2008年1月7日星期一

JDK5原子变量实践――Ejb调用自动重连

  1. ConnectorStatusPair类,需要同步两个状态,一个是进入连接队列线程数,一个是连接时间。目的是同一时间只有一个线程在重连,间隔一定时间只重连一次。

    import java.util.concurrent.atomic.AtomicReference;

    public class ConnectorStatusPair {

        private static class Pair {

            final int qLength;
            final long timeStamp;

            public Pair(int qLength, long timeStamp) {
                this.qLength = qLength;
                this.timeStamp = timeStamp;
            }
        }
        private AtomicReference<Pair> values;

        public ConnectorStatusPair(int qLength, long timeStamp) {
            values = new AtomicReference<Pair>(new Pair(qLength, timeStamp));
        }
       
        public int getQLength() {
            return values.get().qLength;
        }

        public long getTimeStamp() {
            return values.get().timeStamp;
        }

        /**
         * 原子操作加1
         * @return 更新后的值
         */
        public int incrementAndGetQLength() {
            while (true) {
                Pair oldv = values.get();
                Pair newv = new Pair(oldv.qLength + 1, oldv.timeStamp);
                if (values.compareAndSet(oldv, newv)) {
                    return newv.qLength;
                }
            }
        }

        /**
         * 原子操作减一
         * @return 更新后的值
         */
        public int decrementAndGetQLength() {
            while (true) {
                Pair oldv = values.get();
                Pair newv = new Pair(oldv.qLength - 1, oldv.timeStamp);
                if (values.compareAndSet(oldv, newv)) {
                    return newv.qLength;
                }
            }
        }

        /**
         * 原子操作比较当前时间减去values保存的时间是否大于给定的时间间隔,以毫秒计,如果为真则更新values保存的时间为当前时间
         * @param reconnectTimePeriod
         * @return 如果当前时间减去values保存的时间大于给定的时间间隔返回真,否则返回非真
         */
        public boolean compareAndSetTimeStamp(int reconnectTimePeriod) {
            while (true) {
                Pair oldv = values.get();
                long time = System.currentTimeMillis();
                if ((time - oldv.timeStamp) < reconnectTimePeriod) {
                    return false;
                }
                Pair newv = new Pair(oldv.qLength, time);
                if (values.compareAndSet(oldv, newv)) {
                    return true;
                }
            }
        }
    }

  2. Ejb3ClientProxy类,调用ejb方法,支持自动重连同步

    import com.xued.clusteredejb.ExRemote;
    import java.util.Properties ;
    import javax.naming.Context;
    import javax.naming.InitialContext ;
    import javax.naming.NamingException;
    import org.jboss.remoting.CannotConnectException ;

    public class Ejb3ClientProxy {

        private ExRemote exBean;
        private Context ctx;
        private ConnectorStatusPair connectorStatus = new ConnectorStatusPair(0, 0);

        public Ejb3ClientProxy(String hostUrl) {
            try {
                Properties props = new Properties();
                props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
                props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
                props.put("java.naming.provider.url", hostUrl);
                props.put("jnp.socketFactory", "org.jnp.interfaces.TimedSocketFactory ");
                props.put("jnp.timeout", "10000");
                props.put("jnp.sotimeout", "10000");
                this.ctx = new InitialContext(props);
                getExBeanInstance();
            } catch (NamingException ex) {
                System.out.println("EJB3 client initialize failed!");
            }
        }

        public String invoke(String name) {
            if (connectorStatus.getQLength() != 0) {
                return "not alive";
            }
            String res = null;
            try {
                res = exBean.invoke(name);
            } catch (CannotConnectException ex) {
                if (isNeedReconnect()) {
                    res = refreshConnection(name);
                } else {
                    res =  ex.getClass().getName() + ": " + ex.getMessage();
                }
                connectorStatus.decrementAndGetQLength ();
            } catch (Exception ex) {
                res = ex.getClass ().getName() + ": " + ex.getMessage();
            }
            return res;
        }

        private boolean isNeedReconnect(){
            return connectorStatus.incrementAndGetQLength() == 1 &&
                    connectorStatus.compareAndSetTimeStamp(60000); //间隔60秒重连一次
        }
       
       
        private void getExBeanInstance() throws NamingException {
            this.exBean = (ExRemote) this.ctx.lookup("ExBean/remote");
        }

        private String refreshConnection(String name) {
            String res;
            try {
                getExBeanInstance();
                res = exBean.invoke(name+" Server was crashed");
            } catch (NamingException ex1) {
                System.out.println("reconnect failed");
                res = "error occured";
            }
            return res;
        }
    }



没有评论: