本文共 42972 字,大约阅读时间需要 143 分钟。
znode是zooKeeper集合的核心组件,zookeeper API提供了一小组方法使用zookeeper集合来操纵znode的所有细节。
客户端应该遵循以下步骤,与zookeeper服务器进行清晰和干净的交互
注意:虚拟机上的zookeeper必须处于开启状态。
Zookeeper(String connectionString,int seesionTimeout,Watcher watcher)
编写 ZookeeperConnection.java测试是否能成功连接到Zookeeper
package com.donggua.zookeeper;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.util.concurrent.CountDownLatch;public class ZookeeperConnection { public static void main(String[] args) { try { //计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); //参数1:服务器的ip和端口 //参数2:客户端与服务器之间的会话超时时间,以毫秒为单位的 //参数3:监视器对象 //Zookeeper对象是以异步的方式创建的 ZooKeeper zooKeeper = new ZooKeeper("172.16.114.135:2181", 5000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected){ //说明客户端与服务端连接对象创建成功了 System.out.println("连接创建成功"); //通知计数器对象不要再阻塞主线程了,可以继续往下执行了。 countDownLatch.countDown(); } } }); //主线程阻塞等待连接对象的创建成功 countDownLatch.await(); //打印客户端和服务器的会话ID System.out.println(zooKeeper.getSessionId()); //释放资源 zooKeeper.close(); }catch (Exception e){ e.printStackTrace(); } }}
可以发现,已经成功连接到zookeeper服务器了。
//同步方式create(String path,byte[] data,Listacl CreateMode createMode)//异步方式create(String path,byte[] data,List acl CreateMode createMode,AsyncCallback.StringCallback callback,Object ctx)
1.创建节点
package com.donggua.zookeeper;import org.apache.zookeeper.*;import org.apache.zookeeper.data.ACL;import org.apache.zookeeper.data.Id;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;public class ZKCreate { String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper; @Before public void before() throws Exception{ //计数器对象 CountDownLatch countDownLatch = new CountDownLatch(1); //参数1:服务器的ip和端口 //参数2:客户端与服务器之间的会话超时时间,以毫秒为单位的 //参数3:监视器对象 //Zookeeper对象是以异步的方式创建的 zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected){ //说明客户端与服务端连接对象创建成功了 System.out.println("连接创建成功"); //通知计数器对象不要再阻塞主线程了,可以继续往下执行了。 countDownLatch.countDown(); } } }); //主线程阻塞等待连接对象的创建成功 countDownLatch.await(); } @After public void after() throws Exception{ zooKeeper.close(); } @Test public void create1() throws Exception{ //同步创建节点 // arg1:节点的路径 // arg2:节点的数据 // arg3:权限列表 world:anyone:cdrwa // arg4:节点类型 持久化节点 zooKeeper.create("/create/node1","node1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @Test public void create2() throws Exception { // Ids.READ_ACL_UNSAFE 对应权限 world:anyone:r zooKeeper.create("/create/node2", "node2".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); } @Test public void create3() throws Exception { // world授权模式 // 权限列表 Listacls = new ArrayList (); // 授权模式和授权对象 Id id = new Id("world", "anyone"); // 权限设置 acls.add(new ACL(ZooDefs.Perms.READ, id)); acls.add(new ACL(ZooDefs.Perms.WRITE, id)); zooKeeper.create("/create/node3", "node3".getBytes(), acls, CreateMode.PERSISTENT); } @Test public void create4() throws Exception { // ip授权模式 // 权限列表 List acls = new ArrayList (); // 授权模式和授权对象 Id id = new Id("ip", "192.168.188.133"); // 权限设置 acls.add(new ACL(ZooDefs.Perms.ALL, id)); zooKeeper.create("/create/node4", "node4".getBytes(), acls, CreateMode.PERSISTENT); } @Test public void create5() throws Exception { // auth授权模式 // 添加授权用户 zooKeeper.addAuthInfo("digest", "itcast:123456".getBytes()); zooKeeper.create("/create/node5", "node5".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); } @Test public void create6() throws Exception { // auth授权模式 // 添加授权用户 zooKeeper.addAuthInfo("digest", "itcast:123456".getBytes()); // 权限列表 List acls = new ArrayList (); // 授权模式和授权对象 Id id = new Id("auth", "itcast"); // 权限设置 acls.add(new ACL(ZooDefs.Perms.READ, id)); zooKeeper.create("/create/node6", "node6".getBytes(), acls, CreateMode.PERSISTENT); } @Test public void create7() throws Exception { // digest授权模式 // 权限列表 List acls = new ArrayList (); // 授权模式和授权对象 Id id = new Id("digest", "sky:mjsqJQ8e6gZDKVF+t9LAARPOBjc="); // 权限设置 acls.add(new ACL(ZooDefs.Perms.ALL, id)); zooKeeper.create("/create/node7", "node7".getBytes(), acls, CreateMode.PERSISTENT); } @Test public void create8() throws Exception { // 持久化顺序节点 // Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa String result = zooKeeper.create("/create/node8", "node8".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println(result); } @Test public void create9() throws Exception { // 临时节点 // Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa String result = zooKeeper.create("/create/node9", "node9".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(result); } @Test public void create10() throws Exception { // 临时顺序节点 // Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa String result = zooKeeper.create("/create/node10", "node10".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(result); } @Test public void create11() throws Exception { // 异步方式创建节点 zooKeeper.create("/create/node11", "node11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { // 0 代表创建成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 节点的路径 System.out.println(name); // 上下文参数 System.out.println(ctx); } }, "I am context"); Thread.sleep(10000); System.out.println("结束"); }}
在虚拟机中获取节点的信息,如下图所示:
//同步方式setData(String path,byte[] data,int version)//异步方式setData(String path,byte[] data,int version,AsyncCallback.StatCallback callBack,Object ctx)
代码如下:
package com.donggua.zookeeper;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.concurrent.CountDownLatch;public class ZKSet { String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper; @Before public void before() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); // arg1:zookeeper服务器的ip地址和端口号 // arg2:连接的超时时间 以毫秒为单位 // arg3:监听器对象 zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接创建成功!"); countDownLatch.countDown(); } } }); // 使主线程阻塞等待 countDownLatch.await(); } @After public void after() throws Exception { zooKeeper.close(); } @Test public void set1() throws Exception { // arg1:节点的路径 // arg2:修改的数据 // arg3:数据版本号 -1代表版本号不参与更新 Stat stat = zooKeeper.setData("/set/node1", "node13".getBytes(), -1); // 当前节点的版本号 System.out.println(stat.getVersion()); } @Test public void set2() throws Exception { zooKeeper.setData("/set/node1", "node14".getBytes(), -1, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { // 0代表修改成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数对象 System.out.println(ctx); // 属性描述对象 System.out.println(stat.getVersion()); } }, "I am Context"); Thread.sleep(10000); System.out.println("结束"); }}
// 同步方式delete(String path, int version)// 异步方式delete(String path, int version, AsyncCallback.VoidCallback callBack,Object ctx)
代码如下:
import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.concurrent.CountDownLatch;public class ZKDelete { String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper; @Before public void before() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); // arg1:zookeeper服务器的ip地址和端口号 // arg2:连接的超时时间 以毫秒为单位 // arg3:监听器对象 zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接创建成功!"); countDownLatch.countDown(); } } }); // 使主线程阻塞等待 countDownLatch.await(); } @After public void after() throws Exception { zooKeeper.close(); } @Test public void delete1() throws Exception { // arg1:删除节点的节点路径 // arg2:数据版本信息 -1代表删除节点时不考虑版本信息 zooKeeper.delete("/delete/node1",-1); } @Test public void delete2() throws Exception { // 异步使用方式 zooKeeper.delete("/delete/node2", -1, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { // 0代表删除成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数对象 System.out.println(ctx); } },"I am Context"); Thread.sleep(10000); System.out.println("结束"); }}
// 同步方式getData(String path, boolean b, Stat stat)// 异步方式getData(String path, boolean b,AsyncCallback.DataCallback callBack,Object ctx)
代码如下:
import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.concurrent.CountDownLatch;public class ZKGet { String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper; @Before public void before() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); // arg1:zookeeper服务器的ip地址和端口号 // arg2:连接的超时时间 以毫秒为单位 // arg3:监听器对象 zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接创建成功!"); countDownLatch.countDown(); } } }); // 使主线程阻塞等待 countDownLatch.await(); } @After public void after() throws Exception { zooKeeper.close(); } @Test public void get1() throws Exception { // arg1:节点的路径 // arg3:读取节点属性的对象 Stat stat=new Stat(); byte [] bys=zooKeeper.getData("/get/node1",false,stat); // 打印数据 System.out.println(new String(bys)); // 版本信息 System.out.println(stat.getVersion()); } @Test public void get2() throws Exception { //异步方式 zooKeeper.getData("/get/node1", false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { // 0代表读取成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数对象 System.out.println(ctx); // 数据 System.out.println(new String(data)); // 属性对象 System.out.println(stat.getVersion()); } },"I am Context"); Thread.sleep(10000); System.out.println("结束"); }}
// 同步方式getChildren(String path, boolean b)// 异步方式getChildren(String path, boolean b,AsyncCallback.ChildrenCallbackcallBack,Object ctx)
代码如下:
import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.util.List;import java.util.concurrent.CountDownLatch;public class ZKGetChid { String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper; @Before public void before() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); // arg1:zookeeper服务器的ip地址和端口号 // arg2:连接的超时时间 以毫秒为单位 // arg3:监听器对象 zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接创建成功!"); countDownLatch.countDown(); } } }); // 使主线程阻塞等待 countDownLatch.await(); } @After public void after() throws Exception { zooKeeper.close(); } @Test public void get1() throws Exception { // arg1:节点的路径 Listlist = zooKeeper.getChildren("/get", false); for (String str : list) { System.out.println(str); } } @Test public void get2() throws Exception { // 异步用法 zooKeeper.getChildren("/get", false, new AsyncCallback.ChildrenCallback() { @Override public void processResult(int rc, String path, Object ctx, List children) { // 0代表读取成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数对象 System.out.println(ctx); // 子节点信息 for (String str : children) { System.out.println(str); } } },"I am Context"); Thread.sleep(10000); System.out.println("结束"); }}
// 同步方法exists(String path, boolean b)// 异步方法exists(String path, boolean b,AsyncCallback.StatCallback callBack,Objectctx)
public class ZKExists { String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper; @Before public void before() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); // arg1:zookeeper服务器的ip地址和端口号 // arg2:连接的超时时间 以毫秒为单位 // arg3:监听器对象 zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接创建成功!"); countDownLatch.countDown(); } } }); // 使主线程阻塞等待 countDownLatch.await(); } @After public void after() throws Exception { zooKeeper.close(); } @Test public void exists1() throws Exception { // arg1:节点的路径 Stat stat=zooKeeper.exists("/exists1",false); // 节点的版本信息 System.out.println(stat.getVersion()); } @Test public void exists2() throws Exception { // 异步方式 zooKeeper.exists("/exists1", false, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { // 0 代表方式执行成功 System.out.println(rc); // 节点的路径 System.out.println(path); // 上下文参数 System.out.println(ctx); // 节点的版本信息 System.out.println(stat.getVersion()); } },"I am Context"); Thread.sleep(10000); System.out.println("结束"); }}
zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者
zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。
watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式。
Watcher实现由三个部分组成:
客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程。
特性 | 说明 |
---|---|
一次性 | watcher 是一次性的,一旦被触发就会移除,再次使用时需要重新注册 |
客户端顺序回调 | watcher 回调是顺序串行执行的,只有回调后客户端才能看到最新的数据状态。一个watcher 回调逻辑不应该太多,以免影响别的watcher 执行 |
轻量级 | WatchEvent 是最小的通信单位,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容 |
时效性 | watcher 只有在当前session 彻底失效时才会无效,若在session 有效期内快速重连成功,则watcher 依然存在,仍可接收到通知; |
Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类:KeeperState、EventType。
Watcher通知状态(KeeperState)KeeperState
是客户端与服务端连接状态发生变化时对应的通知类型。路径为org.apache.zookeeper.Watcher.EventKeeperState
,是一个枚举类,其枚举属性如下: 枚举属性 | 说明 |
---|---|
SyncConnected | 客户端与服务器正常连接时 |
Disconnected | 客户端与服务器断开连接时 |
Expired | 会话session 失效时 |
AuthFailed | 身份认证失败时 |
Watcher事件类型(EventType)
EventType
是数据节点znode
发生变化时对应的通知类型。EventType
变化时KeeperState
永远处于SyncConnected
通知状态下;当keeperState
发生变化时,EventType
永远为None
。其路径为org.apache.zookeeper.Watcher.Event.EventType
,是一个枚举类,枚举属性如下:
枚举属性 | 说明 |
---|---|
None | 无 |
NodeCreated | Watcher 监听的数据节点被创建时 |
NodeDeleted | Watcher 监听的数据节点被删除时 |
NodeDataChanged | Watcher 监听的数据节点内容发生更改时(无论数据是否真的变化) |
NodeChildrenChanged | Watcher 监听的数据节点的子节点列表发生变更时 |
注意:客户端接收到的相关事件通知中只包含状态以及类型等信息,不包含节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需要调用get
等方法重新获取
上面讲到zookeeper
客户端连接的状态和zookeeper
对znode
节点监听的事件类型,下面我们来讲解如何建立zookeeper
的watcher
监听。在zookeeper
中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)
这样的方式来为某个znode
注册监听 。
下表以node-x
节点为例,说明调用的注册方法和可用监听事件间的关系:
注册方式 | created | childrenChanged | Changed | Deleted |
---|---|---|---|---|
zk.exists("/node-x",watcher) | 可监控 | 可监控 | 可监控 | |
zk.getData("/node-x",watcher) | 可监控 | 可监控 | ||
zk.getChildren("/node-x",watcher) | 可监控 | 可监控 |
KeeperState
:通知状态
SyncConnected
:客户端与服务器正常连接时
Disconnected
:客户端与服务器断开连接时
Expired
:会话session
失效时
AuthFailed
:身份认证失败时
事件类型为:None
代码如下:
public class ZKConnectionWatcher implements Watcher{ //计数器对象 static CountDownLatch countDownLatch = new CountDownLatch(1); //连接对象 static ZooKeeper zooKeeper; public static void main(String[] args) { try{ zooKeeper = new ZooKeeper("172.16.114.135:2181",5000,new ZKConnectionWatcher()); //阻塞线程等待连接的创建 countDownLatch.await(); //会话ID System.out.println(zooKeeper.getSessionId()); // 添加授权用户 zooKeeper.addAuthInfo("digest1","shaoyi:123456".getBytes()); byte [] bs=zooKeeper.getData("/node1",false,null); System.out.println(new String(bs)); Thread.sleep(500000); //释放资源 zooKeeper.close(); System.out.println("结束"); }catch (Exception e){ e.printStackTrace(); } } @Override public void process(WatchedEvent event) { try { //事件类型 if(event.getType() == Event.EventType.None){ if(event.getState() == Event.KeeperState.SyncConnected){ System.out.println("连接创建成功"); //通知正在等待的线程,继续向下执行 countDownLatch.countDown(); }else if(event.getState()==Event.KeeperState.Disconnected){ System.out.println("断开连接"); }else if(event.getState()==Event.KeeperState.Expired){ System.out.println("会话超时!"); }else if(event.getState() == Event.KeeperState.AuthFailed){ System.out.println("认证失败"); } } }catch (Exception e){ e.printStackTrace(); } }}
// 使用连接对象的监视器exists(String path, boolean b)// 自定义监视器exists(String path, Watcher w) 可以捕获以下节点的状态:// NodeCreated:节点创建// NodeDeleted:节点删除// NodeDataChanged:节点内容发生变化
代码如下:
public class ZKWatcherExists { private String IP = "172.16.114.135:2181"; private ZooKeeper zooKeeper = null; @Before public void before() throws IOException, InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); // 连接zookeeper客户端 zooKeeper = new ZooKeeper(IP, 6000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("连接对象的参数!"); // 连接成功 if (event.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }); countDownLatch.await(); } @After public void after() throws InterruptedException { zooKeeper.close(); } @Test public void watcherExists1() throws KeeperException, InterruptedException { // arg1:节点的路径 // arg2:使用连接对象中的watcher zooKeeper.exists("/watcher1", true); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherExists2() throws KeeperException, InterruptedException { // arg1:节点的路径 // arg2:自定义watcher对象 zooKeeper.exists("/watcher1", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("自定义watcher"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherExists3() throws KeeperException, InterruptedException { // watcher一次性 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { try { System.out.println("自定义watcher"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); //可以多次捕获/watcher1结点的变化 zooKeeper.exists("/watcher1", this); } catch (Exception ex) { ex.printStackTrace(); } } }; zooKeeper.exists("/watcher1", watcher); Thread.sleep(80000); System.out.println("结束"); } @Test public void watcherExists4() throws KeeperException, InterruptedException { // 注册多个监听器对象 zooKeeper.exists("/watcher1", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("1"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }); zooKeeper.exists("/watcher1", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("2"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }); Thread.sleep(80000); System.out.println("结束"); }}
当结点发生创建、修改、删除时,都会触发Watch监听器的执行。监听是一次性的。
// 使用连接对象的监视器getData(String path, boolean b, Stat stat)// 自定义监视器getData(String path, Watcher w, Stat stat) 可以捕获以下节点的状态: // NodeDeleted:节点删除// NodeDataChanged:节点内容发生变化
代码如下:
package com.donggua.watcher;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.util.concurrent.CountDownLatch;public class ZKWatcherGetData { private String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper = null; @Before public void before() throws IOException, InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); // 连接zookeeper客户端 zooKeeper = new ZooKeeper(IP, 6000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("连接对象的参数!"); // 连接成功 if (event.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }); countDownLatch.await(); } @After public void after() throws InterruptedException { zooKeeper.close(); } @Test public void watcherGetData1() throws KeeperException, InterruptedException { // arg1:节点的路径 // arg2:使用连接对象中的watcher zooKeeper.getData("/watcher2", true, null); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherGetData2() throws KeeperException, InterruptedException { // arg1:节点的路径 // arg2:自定义watcher对象 zooKeeper.getData("/watcher2", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("自定义watcher"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }, null); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherGetData3() throws KeeperException, InterruptedException { // 一次性 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { try { System.out.println("自定义watcher"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); if(event.getType()==Event.EventType.NodeDataChanged) { zooKeeper.getData("/watcher2", this, null); } } catch (Exception ex) { ex.printStackTrace(); } } }; zooKeeper.getData("/watcher2", watcher, null); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherGetData4() throws KeeperException, InterruptedException { // 注册多个监听器对象 zooKeeper.getData("/watcher2", new Watcher() { @Override public void process(WatchedEvent event) { try { System.out.println("1"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); if(event.getType()==Event.EventType.NodeDataChanged) { zooKeeper.getData("/watcher2", this, null); } } catch (Exception ex) { ex.printStackTrace(); } } },null); zooKeeper.getData("/watcher2", new Watcher() { @Override public void process(WatchedEvent event) { try { System.out.println("2"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); if(event.getType()==Event.EventType.NodeDataChanged) { zooKeeper.getData("/watcher2", this, null); } } catch (Exception ex) { ex.printStackTrace(); } } },null); Thread.sleep(50000); System.out.println("结束"); }}
// 使用连接对象的监视器getChildren(String path, boolean b)// 自定义监视器getChildren(String path, Watcher w) 可以捕获下列节点的状态// NodeChildrenChanged:子节点发生变化// NodeDeleted: 当前节点删除
代码如下:
package com.donggua.watcher;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;public class ZKWatcherGetChild { String IP = "172.16.114.135:2181"; ZooKeeper zooKeeper = null; @Before public void before() throws IOException, InterruptedException { CountDownLatch connectedSemaphore = new CountDownLatch(1); // 连接zookeeper客户端 zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("连接对象的参数!"); // 连接成功 if (event.getState() == Event.KeeperState.SyncConnected) { connectedSemaphore.countDown(); } System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }); connectedSemaphore.await(); } @After public void after() throws InterruptedException { zooKeeper.close(); } @Test public void watcherGetChild1() throws KeeperException, InterruptedException { // arg1:节点的路径 // arg2:使用连接对象中的watcher zooKeeper.getChildren("/watcher3", true); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherGetChild2() throws KeeperException, InterruptedException { // arg1:节点的路径 // arg2:自定义watcher zooKeeper.getChildren("/watcher3", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("自定义watcher"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); } }); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherGetChild3() throws KeeperException, InterruptedException { // 一次性 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { try { System.out.println("自定义watcher"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); if (event.getType() == Event.EventType.NodeChildrenChanged) { zooKeeper.getChildren("/watcher3", this); } } catch (Exception ex) { ex.printStackTrace(); } } }; zooKeeper.getChildren("/watcher3", watcher); Thread.sleep(50000); System.out.println("结束"); } @Test public void watcherGetChild4() throws KeeperException, InterruptedException { // 多个监视器对象 zooKeeper.getChildren("/watcher3", new Watcher() { @Override public void process(WatchedEvent event) { try { System.out.println("1"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); if (event.getType() == Event.EventType.NodeChildrenChanged) { zooKeeper.getChildren("/watcher3", this); } } catch (Exception ex) { ex.printStackTrace(); } } }); zooKeeper.getChildren("/watcher3", new Watcher() { @Override public void process(WatchedEvent event) { try { System.out.println("2"); System.out.println("path=" + event.getPath()); System.out.println("eventType=" + event.getType()); if (event.getType() == Event.EventType.NodeChildrenChanged) { zooKeeper.getChildren("/watcher3", this); } } catch (Exception ex) { ex.printStackTrace(); } } }); Thread.sleep(50000); System.out.println("结束"); }}
工作中有这样的一个场景: 数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存。
若数据库的用户名和密码改变时候,还需要重新加载缓存,比较麻烦,通过ZooKeeper可以轻松完成,当数据库发生变化时自动完成缓存同步。
设计思路:
代码如下:
public class MyConfigCenter implements Watcher { // zk的连接串 private String IP = "172.16.114.135:2181"; // 计数器对象 private CountDownLatch countDownLatch = new CountDownLatch(1); // 连接对象 private static ZooKeeper zooKeeper; // 用于本地化存储配置信息 private String url; private String username; private String password; @Override public void process(WatchedEvent event) { try { // 捕获事件状态 if (event.getType() == EventType.None) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接成功"); countDownLatch.countDown(); } else if (event.getState() == Event.KeeperState.Disconnected) { System.out.println("连接断开!"); } else if (event.getState() == Event.KeeperState.Expired) { System.out.println("连接超时!"); // 超时后服务器端已经将连接释放,需要重新连接服务器端 zooKeeper = new ZooKeeper(IP, 6000, new ZKConnectionWatcher()); } else if (event.getState() == Event.KeeperState.AuthFailed) { System.out.println("验证失败!"); } // 当配置信息发生变化时 } else if (event.getType() == EventType.NodeDataChanged) { initValue(); } } catch (Exception ex) { ex.printStackTrace(); } } // 构造方法 private MyConfigCenter() { try { // 创建连接对象 zooKeeper = new ZooKeeper(IP, 5000, this); // 阻塞线程,等待连接的创建成功 countDownLatch.await(); initValue(); }catch (Exception e){ e.printStackTrace(); } } // 连接zookeeper服务器,读取配置信息 private void initValue() { try { // 读取配置信息 this.url = new String(zooKeeper.getData("/config/url", true, null)); this.username = new String(zooKeeper.getData("/config/username", true, null)); this.password = new String(zooKeeper.getData("/config/password", true, null)); } catch (Exception ex) { ex.printStackTrace(); } } public static void main(String[] args) { try { MyConfigCenter myConfigCenter = new MyConfigCenter(); for (int i = 1; i <= 20; i++) { Thread.sleep(5000); System.out.println("url:"+myConfigCenter.getUrl()); System.out.println("username:"+myConfigCenter.getUsername()); System.out.println("password:"+myConfigCenter.getPassword()); System.out.println("########################################"); } } catch (Exception ex) { ex.printStackTrace(); } } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; }}
可以发现,当在虚拟机修改用户名和密码时,连接该服务器的客户端,可以第一时间收到修改后的用户名和密码。
在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID。
设计思路:
代码如下:在这里插入代码片
分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具ZooKeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如何实现排他锁。
设计思路:
代码如下:
public class MyLock { // zk的连接串 private String IP = "172.16.114.135:2181"; // 计数器对象 private CountDownLatch countDownLatch = new CountDownLatch(1); //ZooKeeper配置信息 private ZooKeeper zooKeeper; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath; // 打开zookeeper连接 MyLock() { try { zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.None) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("连接成功!"); countDownLatch.countDown(); } } } }); countDownLatch.await(); } catch (Exception ex) { ex.printStackTrace(); } } //获取锁 public void acquireLock() throws Exception { //创建锁节点 createLock(); //尝试获取锁 attemptLock(); } //创建锁节点 private void createLock() throws Exception { //判断Locks是否存在,不存在创建 Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false); if (stat == null) { zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 创建临时有序节点 lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("节点创建成功:" + lockPath); } //监视器对象,监视上一个节点是否被删除 private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { //同步代码块 苏醒线程,线程继续往下执行。 synchronized (this) { notifyAll(); } } } }; //尝试获取锁 private void attemptLock() throws Exception { // 获取Locks节点下的所有子节点 Listlist = zooKeeper.getChildren(LOCK_ROOT_PATH, false); // 对子节点进行排序 Collections.sort(list); // /Locks/Lock_000000001 将/Locks截取掉 int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); if (index == 0) { System.out.println("获取锁成功!"); } else { // 上一个节点的路径 String path = list.get(index - 1); Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher); if (stat == null) { attemptLock(); } else { //同步代码块阻塞线程 synchronized (watcher) { watcher.wait(); } attemptLock(); } } } //释放锁 public void releaseLock() throws Exception { //删除临时有序节点 zooKeeper.delete(this.lockPath,-1); zooKeeper.close(); System.out.println("锁已经释放:"+this.lockPath); } public static void main(String[] args) { try { MyLock myLock = new MyLock(); myLock.createLock(); } catch (Exception ex) { ex.printStackTrace(); } }}
测试分布式锁:
public class TicketSeller { private void sell(){ System.out.println("售票开始"); // 线程随机休眠数毫秒,模拟现实中的费时操作 int sleepMillis = 5000; try { //代表复杂逻辑执行了一段时间 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票结束"); } private void sellTicketWithLock() throws Exception { MyLock lock = new MyLock(); // 获取锁 lock.acquireLock(); sell(); //释放锁 lock.releaseLock(); } public static void main(String[] args) throws Exception { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<10;i++){ ticketSeller.sellTicketWithLock(); } }}可以发现,客户端是交替执行的,当一个客户端获取到sell方法执行权限时,另外一个客户端是处于等待状态的,当这个客户端释放掉锁的时候,另外一个客户端才能够执行sell方法。
转载地址:http://ipiwi.baihongyu.com/