- ConnectorStatusPair类,需要同步两个状态,一个是进入连接队列线程数,一个是连接时间。目的是同一时间只有一个线程在重连,间隔一定时间只重连一次。
import java.util.concurrent.atomic.AtomicReference;
public class ConnectorStatusPair {
private static class Pair {
final int qLength;
final long timeStamp;
public Pair(int qLength, long timeStamp) {
this.qLength = qLength;
this.timeStamp = timeStamp;
}
}
private AtomicReference<Pair> values;
public ConnectorStatusPair(int qLength, long timeStamp) {
values = new AtomicReference<Pair>(new Pair(qLength, timeStamp));
}
public int getQLength() {
return values.get().qLength;
}
public long getTimeStamp() {
return values.get().timeStamp;
}
/**
* 原子操作加1
* @return 更新后的值
*/
public int incrementAndGetQLength() {
while (true) {
Pair oldv = values.get();
Pair newv = new Pair(oldv.qLength + 1, oldv.timeStamp);
if (values.compareAndSet(oldv, newv)) {
return newv.qLength;
}
}
}
/**
* 原子操作减一
* @return 更新后的值
*/
public int decrementAndGetQLength() {
while (true) {
Pair oldv = values.get();
Pair newv = new Pair(oldv.qLength - 1, oldv.timeStamp);
if (values.compareAndSet(oldv, newv)) {
return newv.qLength;
}
}
}
/**
* 原子操作比较当前时间减去values保存的时间是否大于给定的时间间隔,以毫秒计,如果为真则更新values保存的时间为当前时间
* @param reconnectTimePeriod
* @return 如果当前时间减去values保存的时间大于给定的时间间隔返回真,否则返回非真
*/
public boolean compareAndSetTimeStamp(int reconnectTimePeriod) {
while (true) {
Pair oldv = values.get();
long time = System.currentTimeMillis();
if ((time - oldv.timeStamp) < reconnectTimePeriod) {
return false;
}
Pair newv = new Pair(oldv.qLength, time);
if (values.compareAndSet(oldv, newv)) {
return true;
}
}
}
}
- Ejb3ClientProxy类,调用ejb方法,支持自动重连同步
import com.xued.clusteredejb.ExRemote;
import java.util.Properties ;
import javax.naming.Context;
import javax.naming.InitialContext ;
import javax.naming.NamingException;
import org.jboss.remoting.CannotConnectException ;
public class Ejb3ClientProxy {
private ExRemote exBean;
private Context ctx;
private ConnectorStatusPair connectorStatus = new ConnectorStatusPair(0, 0);
public Ejb3ClientProxy(String hostUrl) {
try {
Properties props = new Properties();
props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
props.put("java.naming.provider.url", hostUrl);
props.put("jnp.socketFactory", "org.jnp.interfaces.TimedSocketFactory ");
props.put("jnp.timeout", "10000");
props.put("jnp.sotimeout", "10000");
this.ctx = new InitialContext(props);
getExBeanInstance();
} catch (NamingException ex) {
System.out.println("EJB3 client initialize failed!");
}
}
public String invoke(String name) {
if (connectorStatus.getQLength() != 0) {
return "not alive";
}
String res = null;
try {
res = exBean.invoke(name);
} catch (CannotConnectException ex) {
if (isNeedReconnect()) {
res = refreshConnection(name);
} else {
res = ex.getClass().getName() + ": " + ex.getMessage();
}
connectorStatus.decrementAndGetQLength ();
} catch (Exception ex) {
res = ex.getClass ().getName() + ": " + ex.getMessage();
}
return res;
}
private boolean isNeedReconnect(){
return connectorStatus.incrementAndGetQLength() == 1 &&
connectorStatus.compareAndSetTimeStamp(60000); //间隔60秒重连一次
}
private void getExBeanInstance() throws NamingException {
this.exBean = (ExRemote) this.ctx.lookup("ExBean/remote");
}
private String refreshConnection(String name) {
String res;
try {
getExBeanInstance();
res = exBean.invoke(name+" Server was crashed");
} catch (NamingException ex1) {
System.out.println("reconnect failed");
res = "error occured";
}
return res;
}
}
2008年1月7日星期一
JDK5原子变量实践――Ejb调用自动重连
订阅:
博文评论 (Atom)
没有评论:
发表评论