首页 技术 正文
技术 2022年11月14日
0 收藏 301 点赞 4,129 浏览 7100 个字
package com.autonavi.tinfo.traffic.zookeeper;import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;import com.github.zkclient.IZkChildListener;
import com.github.zkclient.IZkStateListener;
import com.github.zkclient.ZkClient;public class DistributedZookeeper {
private static final Logger logger = LoggerFactory.getLogger(DistributedZookeeper.class);
private Lock lock = new ReentrantLock();// 锁对象
private int sessionTimeout;
private int connectionTimeout;
private String zkServerList;
// private String zkServerDir = "tmc-city-root-path";
private String subNode = "tmclr";
private String curPath;
private ZkClient zkClient;
private String[] resourcePath;
private String zookeeperPath; public String getZookeeperPath() {
return zookeeperPath;
} public void setZookeeperPath(String zookeeperPath) {
this.zookeeperPath = zookeeperPath;
} private ClassPathXmlApplicationContext context = null; private void start() {
if (context == null) {
context = new ClassPathXmlApplicationContext(resourcePath);
}
} private void destroy() {
if (context != null) {
// context.registerShutdownHook(); logger.info("destroyed current application!!!");
context.stop();
context.close();
context.destroy();
context.registerShutdownHook();
context = null;
}
} public void connect() throws Exception { if (this.zkClient != null) {
this.zkClient.close();
}
this.zkClient = new ZkClient(zkServerList, sessionTimeout, connectionTimeout); if (!zkClient.exists(zookeeperPath)) {
zkClient.createPersistent(zookeeperPath, null);
}
if (curPath == null) {
curPath = zkClient.createEphemeralSequential(zookeeperPath + "/" + subNode, "monitor".getBytes());
} try {
startWatchingTopicStatus();
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage(), e);
logger.error("error occurs during sync data from zk");
System.exit(0);
}
Thread.sleep(2000);// */
handleMonitorNodeChange();
} public void startWatchingTopicStatus() {
ZkTopicStatusListener topicEventListener = new ZkTopicStatusListener();
ZkConnectedStatusListener connectedStatusListener = new ZkConnectedStatusListener();
try {
zkClient.subscribeChildChanges(zookeeperPath, topicEventListener);
zkClient.subscribeStateChanges(connectedStatusListener);
} catch (Exception e) {
logger.error(e.getMessage(), e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
startWatchingTopicStatus();
} } public void handleMonitorNodeChange() throws Exception {
this.lock.lock();
try {
if (zkClient == null)
return;
if (!zkClient.exists(zookeeperPath)) {
zkClient.createPersistent(zookeeperPath, null);
} // 确认curPath是否真的是列表中的最小节点
List<String> childs = zkClient.getChildren(zookeeperPath);
if (childs == null || childs.size() == 0) {
// 创建子节点
curPath = zkClient.createEphemeralSequential(zookeeperPath + "/" + subNode, "monitor".getBytes());
childs = zkClient.getChildren(zookeeperPath); }
Collections.sort(childs); String thisNode = curPath.substring((zookeeperPath + "/").length());
int index = childs.indexOf(thisNode);
if (index < 0) {
curPath = zkClient.createEphemeralSequential(zookeeperPath + "/" + subNode, "monitor".getBytes());
childs = zkClient.getChildren(zookeeperPath);
Collections.sort(childs);
thisNode = curPath.substring((zookeeperPath + "/").length());
index = childs.indexOf(thisNode);
} if (index == 0) {
// 确实是最小节点
start();
} else {
destroy();
}
} finally {
this.lock.unlock();
}
} class ZkTopicStatusListener implements IZkChildListener { @Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
handleMonitorNodeChange();
}
} class ZkConnectedStatusListener implements IZkStateListener { @Override
public void handleStateChanged(KeeperState state) throws Exception {
// TODO Auto-generated method stub ConnectedReadOnly
if (state.equals(KeeperState.SyncConnected) || state.equals(KeeperState.ConnectedReadOnly)) {
System.out.println("zookeeper start to be connected");
handleMonitorNodeChange();
} else if (state.equals(KeeperState.Disconnected)) {
destroy();
}
} @Override
public void handleNewSession() throws Exception {
// TODO Auto-generated method stub
} } public void stop() {
destroy();
if (zkClient == null) {
logger.warn("cannot shutdown already shutdown topic event watcher.");
return;
}
// stopWatchingTopicEvents();
zkClient.close();
zkClient = null;
} public void setZkServerList(String zkServerList) {
this.zkServerList = zkServerList;
} public int getSessionTimeout() {
return sessionTimeout;
} public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
} public int getConnectionTimeout() {
return connectionTimeout;
} public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
} public String[] getResourcePath() {
return resourcePath;
} public void setResourcePath(String[] resourcePath) {
this.resourcePath = resourcePath;
} public static void main(String[] args) throws Exception {
DistributedZookeeper statusMonitor = new DistributedZookeeper();
try {
if (args.length < 5) {
logger.warn("incomplete parameters.");
System.exit(0);
} // statusMonitor.setZkServerList("10.17.133.73:2181,10.17.133.73:2182,10.17.133.73:2183");
// statusMonitor.setConnectionTimeout(5000);
// statusMonitor.setSessionTimeout(5000);
// statusMonitor.setResourcePath(args); statusMonitor.setZkServerList(args[0]);
statusMonitor.setConnectionTimeout(Integer.valueOf(args[1]));
statusMonitor.setSessionTimeout(Integer.valueOf(args[2]));
statusMonitor.setZookeeperPath(args[3]);
statusMonitor.setResourcePath(Arrays.copyOfRange(args, 4, args.length)); statusMonitor.connect(); Executors.newSingleThreadExecutor().awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (Exception e) {
logger.error(e.toString(), e);
statusMonitor.stop();
System.exit(0);
} catch (OutOfMemoryError e) {
logger.error(e.toString(), e);
statusMonitor.stop();
System.exit(0);
}
}}

#!/bin/sh
#zookeeper address
zkAddress=100.69.209.30:2181,100.69.207.28:2181,100.69.193.87:2181
#connection timeout  in  Millseconds
connectTimeOut=60000
#session timeout  in  Millseconds
sessionTimeOut=60000
zookeeperPath=”/tmc-city-path-aone”

x=`echo $0 | grep “^/”`
if test “${x}”; then
    dir=`dirname $0`
else
    pwdv=`pwd`
    dir=`dirname ${pwdv}/$0`
fi
dir=`readlink -m $dir`
echo “app location : “$dir

run=”nohup /opt/taobao/java/bin/java  -Xms2G  -Xmx6G -Duser.dir=$dir/..  -cp ${dir}/../etc:${dir}/../lib/* com.autonavi.tinfo.traffic.zookeeper.DistributedZookeeper $zkAddress $connectTimeOut $sessionTimeOut $zookeeperPath classpath:ctx/**/*.xml”
log=”nohup.out”
app_dir=`echo $dir|awk -F’/’ ‘{print $(NF-1)}’`
len=`expr ${#app_dir} / 3`
app_dir_blur=`expr substr $app_dir 1 $len`
shutdown=”kill `ps -ef|grep /opt/taobao/java/bin/java  |grep $dir| awk ‘{print $2}’`”
sd_rb=”kill `ps -ef|grep /opt/taobao/java/bin/java  |grep $app_dir_blur| awk ‘{print $2}’`”

stopApp() {
    echo “starting stop …”
    pid=`ps -ef|grep /opt/taobao/java/bin/java |grep $dir| awk ‘{print $2}’`
    echo $pid
    if [ ! $pid ]; then
        echo “not find process to kill”
    else
        kill -9 $pid
        echo “kill -9 $pid successfully”
    fi
}

case $1 in
    start)
        $run >> $log 2>&1 &
        chmod 744  $log
        ;;
    stop)
        #$shutdown
        stopApp
        ;;
    restart)
        $shutdown &&
        $run >> $log 2>&1 &
        ;;
    rb)
        $sd_rb &&
        $run >> $log 2>&1 &
        ;;
    *)
        echo “usage: run.sh [start|stop|restart]”
esac

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,489
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,904
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,737
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,490
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,128
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,291