zookeeper实战源码   

源码内容如下:

  • 封装zk操作工具类
  • Curator操作zk类
  • 分布式锁
  • Watcher监听
  • Leader选举
/**
 * zk操作工具类
 * @author yinjihuan
 */
public class ZkUtils {

    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZkUtils instance = new ZkUtils();

    private static ZooKeeper zk;

    public synchronized static ZkUtils getInstance(String host, int port) {
        if (zk == null) {
            connect(host, port);
        }
        return instance;
    }

    private static void connect(String host, int port) {
        String connectString = host + ":" + port;
        try {
            zk = new ZooKeeper(connectString, Constant.ZK_SESSION_TIMEOUT, new Watcher() {

                @Override
                public void process(WatchedEvent event) {
                    System.out.println("已经触发了" + event.getType() + "事件!"); 
                    //判断是否已连接ZK,连接后计数器递减.
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }

            });
            //若计数器不为0,则等待
            latch.await();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String addNode(String nodeName) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public String addNode(String nodeName, String data) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public String addNode(String nodeName, String data, List<ACL> acl, CreateMode createMode) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, data.getBytes(), acl, createMode);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void removeNode(String nodeName) {
        try {
            zk.delete(nodeName, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public void removeNode(String nodeName, int version) {
        try {
            zk.delete(nodeName, version);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 监控数据节点变化
     * @param nodeName
     */
    public void monitorDataUpdate(String nodeName) {
        try {
            zk.getData(nodeName, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    //节点的值有修改
                    if(event.getType() == EventType.NodeDataChanged) {
                        System.out.println(nodeName + "修改了值"+event.getPath());
                        //触发一次就失效,所以需要递归注册
                        monitorDataUpdate(nodeName);
                    }
                }
            }, new Stat());
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}