关键词:
ZooKeeper客户端可以对指定节点设置指定Watcher,当服务器指定节点发生变化是,客户端会收到服务器的通知,然后客户端可以执行相应Watcher的代码。
默认ZooKeeper内置了一个watcher,用于打印收到的服务器的通知。
源码ZooKeeperMain.Watcher:
1 protected void connectToZK(String newHost) throws InterruptedException, IOException { 2 if (zk != null && zk.getState().isAlive()) { 3 zk.close(); 4 } 5 host = newHost; 6 zk = new ZooKeeper(host, 7 Integer.parseInt(cl.getOption("timeout")), 8 new MyWatcher()); 9 } 10 11 private class MyWatcher implements Watcher { 12 public void process(WatchedEvent event) { 13 if (getPrintWatches()) { 14 ZooKeeperMain.printMessage("WATCHER::"); 15 ZooKeeperMain.printMessage(event.toString()); 16 } 17 } 18 }
在获取子节点、获取数据、获取状态可以设置Watcher,该Watcher会被存储到Packet包中,当Packet包收到响应时注册该Watcher,当收到服务器notification时,执行Watcher代码。
源码ZooKeeper.getChildren:
1 public List<String> getChildren(final String path, Watcher watcher) 2 throws KeeperException, InterruptedException 3 { 4 //将watcher封装成childwatcher 5 WatchRegistration wcb = null; 6 if (watcher != null) { 7 wcb = new ChildWatchRegistration(watcher, clientPath); 8 } 9 10 final String serverPath = prependChroot(path); 11 RequestHeader h = new RequestHeader(); 12 h.setType(ZooDefs.OpCode.getChildren); 13 GetChildrenRequest request = new GetChildrenRequest(); 14 request.setPath(serverPath); 15 request.setWatch(watcher != null); 16 GetChildrenResponse response = new GetChildrenResponse(); 17 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 18 if (r.getErr() != 0) { 19 throw KeeperException.create(KeeperException.Code.get(r.getErr()), 20 clientPath); 21 } 22 return response.getChildren(); 23 }
源码ClientCnxn.submitRequest:
1 public ReplyHeader submitRequest(RequestHeader h, Record request, 2 Record response, WatchRegistration watchRegistration) 3 throws InterruptedException { 4 ReplyHeader r = new ReplyHeader(); 5 //watchRegistration被封装到Packet中 6 Packet packet = queuePacket(h, r, request, response, null, null, null, 7 null, watchRegistration); 8 synchronized (packet) { 9 while (!packet.finished) { 10 packet.wait(); 11 } 12 } 13 return r; 14 }
源码ClientCnxn.queuePacket:
1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, 2 Record response, AsyncCallback cb, String clientPath, 3 String serverPath, Object ctx, WatchRegistration watchRegistration) 4 { 5 Packet packet = null; 6 synchronized (outgoingQueue) { 7 if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { 8 h.setXid(getXid()); 9 } 10 //watchRegistration被封装到Packet中 11 packet = new Packet(h, r, request, response, null, 12 watchRegistration); 13 packet.cb = cb; 14 packet.ctx = ctx; 15 packet.clientPath = clientPath; 16 packet.serverPath = serverPath; 17 if (!zooKeeper.state.isAlive() || closing) { 18 conLossPacket(packet); 19 } else { 20 // If the client is asking to close the session then 21 // mark as closing 22 if (h.getType() == OpCode.closeSession) { 23 closing = true; 24 } 25 outgoingQueue.add(packet); 26 } 27 } 28 29 sendThread.wakeup(); 30 return packet; 31 }
当Packet包收到响应时注册该Watcher,源码ClientCnxn.finishPacket:
1 private void finishPacket(Packet p) { 2 if (p.watchRegistration != null) { 3 p.watchRegistration.register(p.replyHeader.getErr()); 4 } 5 6 if (p.cb == null) { 7 synchronized (p) { 8 p.finished = true; 9 p.notifyAll(); 10 } 11 } else { 12 p.finished = true; 13 eventThread.queuePacket(p); 14 } 15 }
当收到服务器notification时,执行Watcher代码,源码ClientCnxn.EventThread:
1 private void processEvent(Object event) { 2 if (event instanceof WatcherSetEventPair) { 3 //执行watcher 4 WatcherSetEventPair pair = (WatcherSetEventPair) event; 5 for (Watcher watcher : pair.watchers) { 6 try { 7 watcher.process(pair.event); 8 } catch (Throwable t) { 9 LOG.error("Error while calling watcher ", t); 10 } 11 } 12 } 13 }
在删除节点、创建节点、获取子节点、设置数据、获取数据、获取权限、设置权限等异步操作时,可以设置CallBack回调函数,该回调对象会被存储到Packet包中,当Packet包收到响应时执行CallBack代码。
源码ZooKeeper.getChildren:
1 public void getChildren(final String path, Watcher watcher, 2 ChildrenCallback cb, Object ctx) 3 { 4 final String clientPath = path; 5 WatchRegistration wcb = null; 6 if (watcher != null) { 7 wcb = new ChildWatchRegistration(watcher, clientPath); 8 } 9 10 final String serverPath = prependChroot(clientPath); 11 12 RequestHeader h = new RequestHeader(); 13 h.setType(ZooDefs.OpCode.getChildren); 14 GetChildrenRequest request = new GetChildrenRequest(); 15 request.setPath(serverPath); 16 request.setWatch(watcher != null); 17 GetChildrenResponse response = new GetChildrenResponse(); 18 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 19 clientPath, serverPath, ctx, wcb); 20 }
源码ClientCnxn.queuePacket:
1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, 2 Record response, AsyncCallback cb, String clientPath, 3 String serverPath, Object ctx, WatchRegistration watchRegistration) 4 { 5 Packet packet = null; 6 synchronized (outgoingQueue) { 7 if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { 8 h.setXid(getXid()); 9 } 10 packet = new Packet(h, r, request, response, null, 11 watchRegistration); 12 packet.cb = cb; 13 packet.ctx = ctx; 14 packet.clientPath = clientPath; 15 packet.serverPath = serverPath; 16 if (!zooKeeper.state.isAlive() || closing) { 17 conLossPacket(packet); 18 } else { 19 // If the client is asking to close the session then 20 // mark as closing 21 if (h.getType() == OpCode.closeSession) { 22 closing = true; 23 } 24 outgoingQueue.add(packet); 25 } 26 } 27 28 sendThread.wakeup(); 29 return packet; 30 }
源码ClientCnxn.EventThread:
1 private void processEvent(Object event) { 2 try { 3 Packet p = (Packet) event; 4 int rc = 0; 5 String clientPath = p.clientPath; 6 if (p.replyHeader.getErr() != 0) { 7 rc = p.replyHeader.getErr(); 8 } 9 if (p.response instanceof ExistsResponse 10 || p.response instanceof SetDataResponse 11 || p.response instanceof SetACLResponse) { 12 StatCallback cb = (StatCallback) p.cb; 13 if (rc == 0) { 14 if (p.response instanceof ExistsResponse) { 15 cb.processResult(rc, clientPath, p.ctx, 16 ((ExistsResponse) p.response) 17 .getStat()); 18 } else if (p.response instanceof SetDataResponse) { 19 cb.processResult(rc, clientPath, p.ctx, 20 ((SetDataResponse) p.response) 21 .getStat()); 22 } else if (p.response instanceof SetACLResponse) { 23 cb.processResult(rc, clientPath, p.ctx, 24 ((SetACLResponse) p.response) 25 .getStat()); 26 } 27 } else { 28 cb.processResult(rc, clientPath, p.ctx, null); 29 } 30 } else if (p.response instanceof GetDataResponse) { 31 DataCallback cb = (DataCallback) p.cb; 32 GetDataResponse rsp = (GetDataResponse) p.response; 33 if (rc == 0) { 34 cb.processResult(rc, clientPath, p.ctx, rsp 35 .getData(), rsp.getStat()); 36 } else { 37 cb.processResult(rc, clientPath, p.ctx, null, 38 null); 39 } 40 } else if (p.response instanceof GetACLResponse) { 41 ACLCallback cb = (ACLCallback) p.cb; 42 GetACLResponse rsp = (GetACLResponse) p.response; 43 if (rc == 0) { 44 cb.processResult(rc, clientPath, p.ctx, rsp 45 .getAcl(), rsp.getStat()); 46 } else { 47 cb.processResult(rc, clientPath, p.ctx, null, 48 null); 49 } 50 } else if (p.response instanceof GetChildrenResponse) { 51 ChildrenCallback cb = (ChildrenCallback) p.cb; 52 GetChildrenResponse rsp = (GetChildrenResponse) p.response; 53 if (rc == 0) { 54 cb.processResult(rc, clientPath, p.ctx, rsp 55 .getChildren()); 56 } else { 57 cb.processResult(rc, clientPath, p.ctx, null); 58 } 59 } else if (p.response instanceof GetChildren2Response) { 60 Children2Callback cb = (Children2Callback) p.cb; 61 GetChildren2Response rsp = (GetChildren2Response) p.response; 62 if (rc == 0) { 63 cb.processResult(rc, clientPath, p.ctx, rsp 64 .getChildren(), rsp.getStat()); 65 } else { 66 cb.processResult(rc, clientPath, p.ctx, null, null); 67 } 68 } else if (p.response instanceof CreateResponse) { 69 StringCallback cb = (StringCallback) p.cb; 70 CreateResponse rsp = (CreateResponse) p.response; 71 if (rc == 0) { 72 cb.processResult(rc, clientPath, p.ctx, 73 (chrootPath == null 74 ? rsp.getPath() 75 : rsp.getPath() 76 .substring(chrootPath.length()))); 77 } else { 78 cb.processResult(rc, clientPath, p.ctx, null); 79 } 80 } else if (p.cb instanceof VoidCallback) { 81 VoidCallback cb = (VoidCallback) p.cb; 82 cb.processResult(rc, clientPath, p.ctx); 83 } 84 } catch (Throwable t) { 85 LOG.error("Caught unexpected throwable", t); 86 } 87 } 88 }
zookeeper源码之客户端网络通信模块
ClientCnxn 为客户端发送请求到服务端,管理底层IO连接。将用户调用的请求对象(RequestHeader、Request)封装成Packet对象,存入发送队列。内部有一个线程会不断读取发送队列中的Packet对象,通过NIO将Packet对象发送到服务端,然... 查看详情
zookeeper源码分析之expiryqueue(代码片段)
ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情
zookeeper源码分析之expiryqueue(代码片段)
ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情
zookeeper之开源客户端zkclient
ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。ZKClient版本及源码maven依赖ZkClient目前有两个不同artifactId的系列。其中最早的0.1版本maven依赖如下:<dependenc... 查看详情
zookeeper之zookeeper底层客户端架构实现原理(转载)
Zookeeper的Client直接与用户打交道,是我们使用Zookeeper的interface。了解ZKClient的结构和工作原理有利于我们合理的使用ZK,并能在使用中更早的发现问题。本文将在研究源码的技术上讲述ZKClient的工作原理及内部工作机制。在看完ZKC... 查看详情
zookeeper源码分析目录
Zookeeper源码分析目录如下 1. 【Zookeeper】源码分析之序列化 2. 【Zookeeper】源码分析之持久化(一)之FileTxnLog 3. 【Zookeeper】源码分析之持久化(二)之FileSnap 4. 【Zookeeper】源码分析之持久化(三)之F... 查看详情
zookeeper源码之服务端
zookeeper服务端主要包括一下几个模块: 1.启动模块。 启动模块 读取配置文件,启动程序。详见:zookeeper源码之服务端启动模块。 查看详情
zookeeper源码之存储系统
存储系统实现了以下功能: 1.对配置按路径方式的读写,详见:zookeeper源码之配置存储。 2.对配置节点进行监听,详见:zookeeper源码之配置监听。 3.对配置节点进行权限控制。 4.配置的持久化,详见。 查看详情
zookeeper源码之服务端数据管理(代码片段)
ZooKeeper中数据存储分为两部分:内存数据存储与磁盘数据存储。主要分为以下几个部分: 1.数据库管理中心。 2.树结构数据管理 3.session数据管理 4.持久化管理数据库管理中心 负责ZooKeeper整个数据管理,详... 查看详情
zookeeper源码之服务端启动模块
...启动模块主要负责解析配置文件,启动服务器监听并执行zookeeper命令。类图 QuorumPeerMain QuorumPeerMain是服务端主程序,主要功能是解析配置文件,启动zookeeper服务。内部使用QuorumPeerConfig来解析配置文件;使用QuorumPeer来解析... 查看详情
zookeeper源码分析之expiryqueue(代码片段)
ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情
zookeeper源码分析之expiryqueue(代码片段)
ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情
zookeeper源码之配置监听
配置存储不仅维护了一个树结构,还对各个节点添加了变更监听。类图 DataTree内部维护两个通知管理器,分别监听节点数据变更和子节点变更。publicclassDataTree{privatefinalWatchManagerdataWatches=newWatchManager();privatefinalWatchMan... 查看详情
zookeeper源码之请求协议
Packet 包,ClientCnxn内部管理请求内容的模块。由以下几个模块组成: 1.RequestHeaderheader请求头 2.Recordrequest请求内容 3.ByteBufferbb实际需要发送的请求内容。 4.ReplyHeaderreplyHeader响应头 5.Recordresponse响应内容 6.... 查看详情
zookeeper源码分析之请求处理链
... 前面已经分析了Watcher机制的主要代码,现在接着分析Zookeeper中的请求处理链,其是Zookeeper的主要特点之一。二、总体框图 对于请求处理链而言,所有请求处理器的父接口为RequestProcessor,其框架图如下 说明: A... 查看详情
zookeeper源码分析之watcher机制
一、前言 前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。二、总体框图 对于Watcher机制而言,主要涉及的类主要如下。 说明: Watcher,接口类型,其定义了process方... 查看详情
zookeeper之zookeeper的client的分析
1)几个重要概念 ZooKeeper:客户端入口Watcher:客户端注册的callbackZooKeeper.SendThread: IO线程ZooKeeper.EventThread: 事件处理线程,处理各类消息callbackClientCnxnSocketNIO:继承自ClientCnxnSocket,专门处理IO 2)zookeeper初始化应用提 查看详情
zookeeper客户端之zkclient
【ZkClient】 ZkClient是GitHub上一个开源的客户端,如果我们用Maven来管理工程,则引用如下。<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version&g 查看详情