原理系列之——zookeeper的watch监控机制(代码片段)

烁华 烁华     2022-11-30     278

关键词:

在进入今天的正题之前,先来简单介绍下Zookeeper:
  Zookeeper是一个分布式应用程序协调服务,保证数据的一致性,其提供的功能包括:配置维护、域名维护、分布式同步、组服务等。
  watch监控机制是zookeeper的关键技术之一,本文将通过zk的部分源码来简单了解下watch机制的实现原理。

watch监控机制的实现原理

  当今时代,发布订阅场景到处可见,像微信中的公众号消息订阅,或者网购场景下库存消息的订阅通知等等,这些都是属于发布订阅的场景。
  watch监控机制是zk的一个关键技术,zk通过它来实现发布订阅的功能,通过watch我们可以联想到设计模式中的观察者模式,二者确实有点类似,你可以将其看成是分布式场景下的观察者模式。

客户端watch的注册和回调

客户端watch注册实现过程:
  发送一个带有watch事件的请求——>DataWatchRegistration保存watch事件——>将请求封装成Packet并放入一个队列等待发送——>调用SendThread中的readResponse——>ZKWatchManager将该watch事件进行存储

//Zookeeper.java
    public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException 
        PathUtils.validatePath(path);
        ZooKeeper.WatchRegistration wcb = null;
        if (watcher != null) 
        	//注册watch
            wcb = new ZooKeeper.DataWatchRegistration(watcher, path);
        

        String serverPath = this.prependChroot(path);
        RequestHeader h = new RequestHeader();
        h.setType(4);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) 
            throw KeeperException.create(Code.get(r.getErr()), path);
         else 
            if (stat != null) 
                DataTree.copyStat(response.getStat(), stat);
            

            return response.getData();
        
    
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException 
        ReplyHeader r = new ReplyHeader();
        ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration, watchDeregistration);
        synchronized(packet) 
            while(!packet.finished) 
                packet.wait();
            

            return r;
        
    
    public ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) 
        ClientCnxn.Packet packet = null;
        packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        synchronized(this.state) 
            if (this.state.isAlive() && !this.closing) 
                if (h.getType() == -11) 
                    this.closing = true;
                

                this.outgoingQueue.add(packet);
             else 
                this.conLossPacket(packet);
            
        

        this.sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    
    class SendThread extends ZooKeeperThread 
        .....
        void readResponse(ByteBuffer incomingBuffer) throws IOException 
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            if (replyHdr.getXid() == -2) 
                if (ClientCnxn.LOG.isDebugEnabled()) 
                    ClientCnxn.LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " after " + (System.nanoTime() - this.lastPingSentNs) / 1000000L + "ms");
                
             else if (replyHdr.getXid() == -4) 
                if (replyHdr.getErr() == Code.AUTHFAILED.intValue()) 
                    ClientCnxn.this.state = States.AUTH_FAILED;
                    ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
                

                if (ClientCnxn.LOG.isDebugEnabled()) 
                    ClientCnxn.LOG.debug("Got auth sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
                

             else if (replyHdr.getXid() == -1) 
                if (ClientCnxn.LOG.isDebugEnabled()) 
                    ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
                

                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                if (ClientCnxn.this.chrootPath != null) 
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) 
                        event.setPath("/");
                     else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) 
                        event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
                     else 
                        ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
                    
                

                WatchedEvent we = new WatchedEvent(event);
                if (ClientCnxn.LOG.isDebugEnabled()) 
                    ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
                

                ClientCnxn.this.eventThread.queueEvent(we);
             else if (this.tunnelAuthInProgress()) 
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia, "token");
                ClientCnxn.this.zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
             else 
                ClientCnxn.Packet packet;
                synchronized(ClientCnxn.this.pendingQueue) 
                    if (ClientCnxn.this.pendingQueue.size() == 0) 
                        throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                    

                    packet = (ClientCnxn.Packet)ClientCnxn.this.pendingQueue.remove();
                

                try 
                    if (packet.requestHeader.getXid() != replyHdr.getXid()) 
                        packet.replyHeader.setErr(Code.CONNECTIONLOSS.intValue());
                        throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet);
                    

                    packet.replyHeader.setXid(replyHdr.getXid());
                    packet.replyHeader.setErr(replyHdr.getErr());
                    packet.replyHeader.setZxid(replyHdr.getZxid());
                    if (replyHdr.getZxid() > 0L) 
                        ClientCnxn.this.lastZxid = replyHdr.getZxid();
                    

                    if (packet.response != null && replyHdr.getErr() == 0) 
                        packet.response.deserialize(bbia, "response");
                    

                    if (ClientCnxn.LOG.isDebugEnabled()) 
                        ClientCnxn.LOG.debug("Reading reply sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", packet:: " + packet);
                    
                 finally 
                    ClientCnxn.this.finishPacket(packet);
                

            
        
private void finishPacket(ClientCnxn.Packet p) 
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) 
            p.watchRegistration.register(err);
        

        if (p.watchDeregistration != null) 
            Map materializedWatchers = null;

            try 
                materializedWatchers = p.watchDeregistration.unregister(err);
                Iterator i$ = materializedWatchers.entrySet().iterator();

                while(i$.hasNext()) 
                    Entry<EventType, Set<Watcher>> entry = (Entry)i$.next();
                    Set<Watcher> watchers = (Set)entry.getValue();
                    if (watchers.size() > 0) 
                        this.queueEvent(p.watchDeregistration.getClientPath(), err, watchers, (EventType)entry.getKey());
                        p.replyHeader.setErr(Code.OK.intValue());
                    
                
             catch (NoWatcherException var9) 
                LOG.error("Failed to find watcher!", var9);
                p.replyHeader.setErr(var9.code().intValue());
             catch (KeeperException var10) 
                LOG.error("Exception when removing watcher", var10);
                p.replyHeader.setErr(var10.code().intValue());
            
        

        if (p.cb == null) 
            synchronized(p) 
                p.finished = true;
                p.notifyAll();
            
         else 
            p.finished = true;
            this.eventThread.queuePacket(p);
        

    

客户端回调处理过程:
  在SendThread.readResponse()中的xid=-1来进行处理——>调用 eventThread.queueEvent()进行处理

    class SendThread extends ZooKeeperThread 
        .....
        void readResponse(ByteBuffer incomingBuffer) throws IOException 
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            if (replyHdr.getXid() == -2) 
  				....
             else if (replyHdr.getXid() == -1) 
                if (ClientCnxn.LOG.isDebugEnabled()) 
                    ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
                

                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                if (ClientCnxn.this查看详情  

大数据技术之hadoop——zookeeper(代码片段)

目录 一、认识Zookeeper1、概念2、特性3、集群角色二、数据模型1、数据存储结构2、Znode的类型3、Znode的属性 三、Zookeeper的Watch机制1、Watch机制的认识2、Watch机制的通知状态和时间类型四、Zookeeper的选举机制1、选举机制的认识2、... 查看详情

zookeeper学习笔记之leaderelection

ZooKeeper四种节点类型:PersistPersist_SequentialEphemeralEphemeral_Sequential在节点上可注册的Watch,客户端先得到通知再得到数据,Watch被fire后,不会再Watch到后续的变化。基于ZooKeeper做LeaderElection非公平模式-客户端会在Persist父节点下创建E... 查看详情

springcloudalibaba系列一文全面解析zookeeper安装常用命令javaapi操作watch事件监听分布式锁集群搭建核心理论(代码片段)

文章目录Zookeeper一、简介二、应用场景三、设计目标四、数据模型五、单机安装六、命令操作1.服务端常用命令2.客户端常用命令3.创建临时顺序结点七、JavaAPI操作1.Curator介绍2.建立连接3.添加结点4.查询结点5.设置结点6.删除结点7... 查看详情

zookeeper系列之:zookeeper简介浅谈

一、zookeeper的定义  打开zookeeper官网,赫然一行大字,写着:“ApacheZooKeeper致力于开发和维护实现高度可靠的分布式协调的开源服务器”。什么意思呢?就是ApacheZooKeeper的目标是开发和维护开源服务器,这服务器是干什么的呢... 查看详情

watcher实现机制之client注冊

Zookeeper提供的了分布式数据的公布/订阅功能,通过Watch机制来实现这样的分布式的通知功能。Zookeeper同意client向server注冊一个Watch监听。当服务端的一些指定的事件触发了这个Watch。就会向指定的client发送一个事件通知来实现分... 查看详情

大数据学习系列之七-----hadoop+spark+zookeeper+hbase+hive集

引言在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive环境以及一些测试。其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式。至于为什么先写单机的搭建,是因为作为个人学习的话,... 查看详情

iptables系列之基础原理

Linux网络防火墙 netfilter:frame过滤,内核中的过滤框架,规则生效的位置框架    iptables:附加在netfilter上,生成防火墙规则,真正实现数据报文过滤,NAT,mangle等规则生成的工具防火墙:工作在主机或者网络的... 查看详情

zabbix系列zabbix3.4监控zookeeper3.4.10

监控zookeeper来自网上,大家一搜就可搜到了,只是zabbix版本和zookeeper有点出入,自行修改一下就可以了。zookeeper监控要点系统监控这个监控linux系统以及修改linux服务器参数即可内存使用量ZooKeeper应当完全运行在内存中,不能使... 查看详情

storm基础系列之二----zookeeper的作用

在storm集群中,我们常常使用zookeeper作为协调者。那么具体发挥的是什么作用呢?概括来说,zookeeper是nimbus和supervisor进行交互的中介。具体来说有二:1、nimbus通过在zookeeper上写状态信息来分配任务。通俗的讲就是写哪些supervisor... 查看详情

面试题系列---vue中watch原理

1、普通的watch2、对象属性的watch:  1.对象和数组都是引用类型,引用类型变量存的是地址,地址没有变,所以不会触发watch。这时我们需要进行深度监听,就需要加上一个属性deep,值为true 2.watch有一个特点,当值第一次绑... 查看详情

vue之watch和计算属性computed

...但是这个变量一定是vue实例里面的。watch和computed均可以监控程序员想要监控的对象,当这些对象发生改变之后,可以触发回调函数做一些逻辑处理。一、计算属性computed的特点二、watch监控自身属性变化一、watch监控路由对象watch... 查看详情

java之springboot入门到精通idea版springboot原理分析,springboot监控(一篇文章精通系列)下(代码片段)

...列)【中】Java之SpringBoot入门到精通【IDEA版】SpringBoot原理分析,Spri 查看详情

zookeeper之zookeeper底层客户端架构实现原理(转载)

Zookeeper的Client直接与用户打交道,是我们使用Zookeeper的interface。了解ZKClient的结构和工作原理有利于我们合理的使用ZK,并能在使用中更早的发现问题。本文将在研究源码的技术上讲述ZKClient的工作原理及内部工作机制。在看完ZKC... 查看详情

zookeeper原理分析之存储结构snapshot

Zookeeper内存结构Zookeeper数据在内存中的结构类似于linux的目录结构,DataTree代表这个目录结构,DataNode代表一个节点。DataTree默认初始化三个目录:"","/zookeeper","/zookeeper/quota"DataNode表示一个节点,存储了一下信息:父节点的引用节... 查看详情

zookeeper监控的原理和使用(代码片段)

1、Watcher机制:  Zookeeper允许客户端向服务端的某个Znode注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据Watcher... 查看详情

死磕java同步系列之zookeeper分布式锁

问题(1)zookeeper如何实现分布式锁?(2)zookeeper分布式锁有哪些优点?(3)zookeeper分布式锁有哪些缺点?简介zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它可以为分布式应用提供一致性的服务,它是Hadoop和... 查看详情

jvm内部原理系列

JVM内部原理(一)—概述JVM内部原理(二)—基本概念之字节码JVM内部原理(三)—基本概念之类文件格式JVM内部原理(四)—基本概念之JVM结构JVM内部原理(五)—基本概念之Java虚拟机官方规范文档,第7版JVM内部原理(六)... 查看详情