zookeeper源码之客户端

zwh1988 zwh1988     2022-10-14     739

关键词:

  ZooKeeper客户端可以对指定节点设置指定Watcher,当服务器指定节点发生变化是,客户端会收到服务器的通知,然后客户端可以执行相应Watcher的代码。

  默认ZooKeeper内置了一个watcher,用于打印收到的服务器的通知。

源码ZooKeeperMain.Watcher:

 1 protected void connectToZK(String newHost) throws InterruptedException, IOException {
 2         if (zk != null && zk.getState().isAlive()) {
 3             zk.close();
 4         }
 5         host = newHost;
 6         zk = new ZooKeeper(host,
 7                  Integer.parseInt(cl.getOption("timeout")),
 8                  new MyWatcher());
 9  }
10 
11 private class MyWatcher implements Watcher {
12         public void process(WatchedEvent event) {
13             if (getPrintWatches()) {
14                 ZooKeeperMain.printMessage("WATCHER::");
15                 ZooKeeperMain.printMessage(event.toString());
16             }
17         }
18  }
View Code

  在获取子节点、获取数据、获取状态可以设置Watcher,该Watcher会被存储到Packet包中,当Packet包收到响应时注册该Watcher,当收到服务器notification时,执行Watcher代码。

源码ZooKeeper.getChildren:

 1 public List<String> getChildren(final String path, Watcher watcher)
 2         throws KeeperException, InterruptedException
 3     {
 4         //将watcher封装成childwatcher
 5         WatchRegistration wcb = null;
 6         if (watcher != null) {
 7             wcb = new ChildWatchRegistration(watcher, clientPath);
 8         }
 9 
10         final String serverPath = prependChroot(path);
11         RequestHeader h = new RequestHeader();
12         h.setType(ZooDefs.OpCode.getChildren);
13         GetChildrenRequest request = new GetChildrenRequest();
14         request.setPath(serverPath);
15         request.setWatch(watcher != null);
16         GetChildrenResponse response = new GetChildrenResponse();
17         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
18         if (r.getErr() != 0) {
19             throw KeeperException.create(KeeperException.Code.get(r.getErr()),
20                     clientPath);
21         }
22         return response.getChildren();
23     }
View Code

源码ClientCnxn.submitRequest:

 1 public ReplyHeader submitRequest(RequestHeader h, Record request,
 2             Record response, WatchRegistration watchRegistration)
 3             throws InterruptedException {
 4         ReplyHeader r = new ReplyHeader();
 5 //watchRegistration被封装到Packet中
 6         Packet packet = queuePacket(h, r, request, response, null, null, null,
 7                     null, watchRegistration);
 8         synchronized (packet) {
 9             while (!packet.finished) {
10                 packet.wait();
11             }
12         }
13         return r;
14     }
View Code

源码ClientCnxn.queuePacket:

 1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
 2             Record response, AsyncCallback cb, String clientPath,
 3             String serverPath, Object ctx, WatchRegistration watchRegistration)
 4     {
 5         Packet packet = null;
 6         synchronized (outgoingQueue) {
 7             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
 8                 h.setXid(getXid());
 9             }
10 //watchRegistration被封装到Packet中
11             packet = new Packet(h, r, request, response, null,
12                     watchRegistration);
13             packet.cb = cb;
14             packet.ctx = ctx;
15             packet.clientPath = clientPath;
16             packet.serverPath = serverPath;
17             if (!zooKeeper.state.isAlive() || closing) {
18                 conLossPacket(packet);
19             } else {
20                 // If the client is asking to close the session then
21                 // mark as closing
22                 if (h.getType() == OpCode.closeSession) {
23                     closing = true;
24                 }
25                 outgoingQueue.add(packet);
26             }
27         }
28 
29         sendThread.wakeup();
30         return packet;
31     }
View Code

当Packet包收到响应时注册该Watcher,源码ClientCnxn.finishPacket:

 1 private void finishPacket(Packet p) {
 2         if (p.watchRegistration != null) {
 3             p.watchRegistration.register(p.replyHeader.getErr());
 4         }
 5 
 6         if (p.cb == null) {
 7             synchronized (p) {
 8                 p.finished = true;
 9                 p.notifyAll();
10             }
11         } else {
12             p.finished = true;
13             eventThread.queuePacket(p);
14         }
15     }
View Code

当收到服务器notification时,执行Watcher代码,源码ClientCnxn.EventThread:

 1 private void processEvent(Object event) {
 2    if (event instanceof WatcherSetEventPair) {
 3 //执行watcher
 4                   WatcherSetEventPair pair = (WatcherSetEventPair) event;
 5                   for (Watcher watcher : pair.watchers) {
 6                       try {
 7                           watcher.process(pair.event);
 8                       } catch (Throwable t) {
 9                           LOG.error("Error while calling watcher ", t);
10                       }
11                   }
12               }
13 }
View Code

      在删除节点、创建节点、获取子节点、设置数据、获取数据、获取权限、设置权限等异步操作时,可以设置CallBack回调函数,该回调对象会被存储到Packet包中,当Packet包收到响应时执行CallBack代码。

源码ZooKeeper.getChildren:

 1 public void getChildren(final String path, Watcher watcher,
 2             ChildrenCallback cb, Object ctx)
 3     {
 4         final String clientPath = path;
 5         WatchRegistration wcb = null;
 6         if (watcher != null) {
 7             wcb = new ChildWatchRegistration(watcher, clientPath);
 8         }
 9 
10         final String serverPath = prependChroot(clientPath);
11 
12         RequestHeader h = new RequestHeader();
13         h.setType(ZooDefs.OpCode.getChildren);
14         GetChildrenRequest request = new GetChildrenRequest();
15         request.setPath(serverPath);
16         request.setWatch(watcher != null);
17         GetChildrenResponse response = new GetChildrenResponse();
18         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
19                 clientPath, serverPath, ctx, wcb);
20     }
View Code

源码ClientCnxn.queuePacket:

 1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
 2             Record response, AsyncCallback cb, String clientPath,
 3             String serverPath, Object ctx, WatchRegistration watchRegistration)
 4     {
 5         Packet packet = null;
 6         synchronized (outgoingQueue) {
 7             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
 8                 h.setXid(getXid());
 9             }
10             packet = new Packet(h, r, request, response, null,
11                     watchRegistration);
12             packet.cb = cb;
13             packet.ctx = ctx;
14             packet.clientPath = clientPath;
15             packet.serverPath = serverPath;
16             if (!zooKeeper.state.isAlive() || closing) {
17                 conLossPacket(packet);
18             } else {
19                 // If the client is asking to close the session then
20                 // mark as closing
21                 if (h.getType() == OpCode.closeSession) {
22                     closing = true;
23                 }
24                 outgoingQueue.add(packet);
25             }
26         }
27 
28         sendThread.wakeup();
29         return packet;
30     }
View Code

源码ClientCnxn.EventThread:

 1 private void processEvent(Object event) {
 2           try {
 3                   Packet p = (Packet) event;
 4                   int rc = 0;
 5                   String clientPath = p.clientPath;
 6                   if (p.replyHeader.getErr() != 0) {
 7                       rc = p.replyHeader.getErr();
 8                   }
 9                   if (p.response instanceof ExistsResponse
10                           || p.response instanceof SetDataResponse
11                           || p.response instanceof SetACLResponse) {
12                       StatCallback cb = (StatCallback) p.cb;
13                       if (rc == 0) {
14                           if (p.response instanceof ExistsResponse) {
15                               cb.processResult(rc, clientPath, p.ctx,
16                                       ((ExistsResponse) p.response)
17                                               .getStat());
18                           } else if (p.response instanceof SetDataResponse) {
19                               cb.processResult(rc, clientPath, p.ctx,
20                                       ((SetDataResponse) p.response)
21                                               .getStat());
22                           } else if (p.response instanceof SetACLResponse) {
23                               cb.processResult(rc, clientPath, p.ctx,
24                                       ((SetACLResponse) p.response)
25                                               .getStat());
26                           }
27                       } else {
28                           cb.processResult(rc, clientPath, p.ctx, null);
29                       }
30                   } else if (p.response instanceof GetDataResponse) {
31                       DataCallback cb = (DataCallback) p.cb;
32                       GetDataResponse rsp = (GetDataResponse) p.response;
33                       if (rc == 0) {
34                           cb.processResult(rc, clientPath, p.ctx, rsp
35                                   .getData(), rsp.getStat());
36                       } else {
37                           cb.processResult(rc, clientPath, p.ctx, null,
38                                   null);
39                       }
40                   } else if (p.response instanceof GetACLResponse) {
41                       ACLCallback cb = (ACLCallback) p.cb;
42                       GetACLResponse rsp = (GetACLResponse) p.response;
43                       if (rc == 0) {
44                           cb.processResult(rc, clientPath, p.ctx, rsp
45                                   .getAcl(), rsp.getStat());
46                       } else {
47                           cb.processResult(rc, clientPath, p.ctx, null,
48                                   null);
49                       }
50                   } else if (p.response instanceof GetChildrenResponse) {
51                       ChildrenCallback cb = (ChildrenCallback) p.cb;
52                       GetChildrenResponse rsp = (GetChildrenResponse) p.response;
53                       if (rc == 0) {
54                           cb.processResult(rc, clientPath, p.ctx, rsp
55                                   .getChildren());
56                       } else {
57                           cb.processResult(rc, clientPath, p.ctx, null);
58                       }
59                   } else if (p.response instanceof GetChildren2Response) {
60                       Children2Callback cb = (Children2Callback) p.cb;
61                       GetChildren2Response rsp = (GetChildren2Response) p.response;
62                       if (rc == 0) {
63                           cb.processResult(rc, clientPath, p.ctx, rsp
64                                   .getChildren(), rsp.getStat());
65                       } else {
66                           cb.processResult(rc, clientPath, p.ctx, null, null);
67                       }
68                   } else if (p.response instanceof CreateResponse) {
69                       StringCallback cb = (StringCallback) p.cb;
70                       CreateResponse rsp = (CreateResponse) p.response;
71                       if (rc == 0) {
72                           cb.processResult(rc, clientPath, p.ctx,
73                                   (chrootPath == null
74                                           ? rsp.getPath()
75                                           : rsp.getPath()
76                                     .substring(chrootPath.length())));
77                       } else {
78                           cb.processResult(rc, clientPath, p.ctx, null);
79                       }
80                   } else if (p.cb instanceof VoidCallback) {
81                       VoidCallback cb = (VoidCallback) p.cb;
82                       cb.processResult(rc, clientPath, p.ctx);
83                   }
84           } catch (Throwable t) {
85               LOG.error("Caught unexpected throwable", t);
86           }
87        }
88     }
View Code

 

zookeeper源码之客户端网络通信模块

ClientCnxn  为客户端发送请求到服务端,管理底层IO连接。将用户调用的请求对象(RequestHeader、Request)封装成Packet对象,存入发送队列。内部有一个线程会不断读取发送队列中的Packet对象,通过NIO将Packet对象发送到服务端,然... 查看详情

zookeeper源码分析之expiryqueue(代码片段)

 ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情

zookeeper源码分析之expiryqueue(代码片段)

 ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情

zookeeper之开源客户端zkclient

ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。ZKClient版本及源码maven依赖ZkClient目前有两个不同artifactId的系列。其中最早的0.1版本maven依赖如下:<dependenc... 查看详情

zookeeper之zookeeper底层客户端架构实现原理(转载)

Zookeeper的Client直接与用户打交道,是我们使用Zookeeper的interface。了解ZKClient的结构和工作原理有利于我们合理的使用ZK,并能在使用中更早的发现问题。本文将在研究源码的技术上讲述ZKClient的工作原理及内部工作机制。在看完ZKC... 查看详情

zookeeper源码分析目录

Zookeeper源码分析目录如下  1. 【Zookeeper】源码分析之序列化  2. 【Zookeeper】源码分析之持久化(一)之FileTxnLog  3. 【Zookeeper】源码分析之持久化(二)之FileSnap  4. 【Zookeeper】源码分析之持久化(三)之F... 查看详情

zookeeper源码之服务端

zookeeper服务端主要包括一下几个模块:  1.启动模块。 启动模块  读取配置文件,启动程序。详见:zookeeper源码之服务端启动模块。 查看详情

zookeeper源码之存储系统

  存储系统实现了以下功能:  1.对配置按路径方式的读写,详见:zookeeper源码之配置存储。  2.对配置节点进行监听,详见:zookeeper源码之配置监听。  3.对配置节点进行权限控制。  4.配置的持久化,详见。  查看详情

zookeeper源码之服务端数据管理(代码片段)

  ZooKeeper中数据存储分为两部分:内存数据存储与磁盘数据存储。主要分为以下几个部分:  1.数据库管理中心。  2.树结构数据管理  3.session数据管理  4.持久化管理数据库管理中心  负责ZooKeeper整个数据管理,详... 查看详情

zookeeper源码之服务端启动模块

...启动模块主要负责解析配置文件,启动服务器监听并执行zookeeper命令。类图  QuorumPeerMain  QuorumPeerMain是服务端主程序,主要功能是解析配置文件,启动zookeeper服务。内部使用QuorumPeerConfig来解析配置文件;使用QuorumPeer来解析... 查看详情

zookeeper源码分析之expiryqueue(代码片段)

 ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情

zookeeper源码分析之expiryqueue(代码片段)

 ExpiryQueue是zookeeper管理客户端连接超时的工具类。它是将松散的时间按expirationInterval间隔映射成一个一个具体的时间点。计算时间所属时间段的算法:privatelongroundToNextInterval(longtime)return(time/expirationInterval+1)*expirationInterval... 查看详情

zookeeper源码之配置监听

   配置存储不仅维护了一个树结构,还对各个节点添加了变更监听。类图  DataTree内部维护两个通知管理器,分别监听节点数据变更和子节点变更。publicclassDataTree{privatefinalWatchManagerdataWatches=newWatchManager();privatefinalWatchMan... 查看详情

zookeeper源码之请求协议

Packet  包,ClientCnxn内部管理请求内容的模块。由以下几个模块组成:  1.RequestHeaderheader请求头  2.Recordrequest请求内容  3.ByteBufferbb实际需要发送的请求内容。  4.ReplyHeaderreplyHeader响应头  5.Recordresponse响应内容  6.... 查看详情

zookeeper源码分析之请求处理链

... 前面已经分析了Watcher机制的主要代码,现在接着分析Zookeeper中的请求处理链,其是Zookeeper的主要特点之一。二、总体框图  对于请求处理链而言,所有请求处理器的父接口为RequestProcessor,其框架图如下    说明:  A... 查看详情

zookeeper源码分析之watcher机制

一、前言  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。二、总体框图  对于Watcher机制而言,主要涉及的类主要如下。    说明:  Watcher,接口类型,其定义了process方... 查看详情

zookeeper之zookeeper的client的分析

1)几个重要概念 ZooKeeper:客户端入口Watcher:客户端注册的callbackZooKeeper.SendThread: IO线程ZooKeeper.EventThread: 事件处理线程,处理各类消息callbackClientCnxnSocketNIO:继承自ClientCnxnSocket,专门处理IO 2)zookeeper初始化应用提 查看详情

zookeeper客户端之zkclient

【ZkClient】  ZkClient是GitHub上一个开源的客户端,如果我们用Maven来管理工程,则引用如下。<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version&g 查看详情