基于zookeeper简单实现分布式锁

lytwajue lytwajue     2022-09-05     799

关键词:

这里利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制。来简单实现分布式锁。

主要思想:
1、开启10个线程。在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点。
2、获取disLocks节点下全部子节点,排序,假设自己的节点编号最小,则获取锁;
3、否则watch排在自己前面的节点,监听到其删除后,进入第2步(又一次检測排序是防止监听的节点发生连接失效。导致的节点删除情况);
4、删除自身sub节点,释放连接;

这里插播下zookeeper的4种节点类型:
public enum CreateMode {
   
    /**
     * 持久节点:节点创建后。会一直存在,不会因client会话失效而删除;
     */
    PERSISTENT (0, false, false),

    /**
    * 持久顺序节点:基本特性与持久节点一致。创建节点的过程中。zookeeper会在其名字后自己主动追加一个单调增长的数字后缀,作为新的节点名。 
    */
    PERSISTENT_SEQUENTIAL (2, false, true),

    /**
     *  暂时节点:client会话失效或连接关闭后,该节点会被自己主动删除。且不能再暂时节点以下创建子节点,否则报例如以下错:org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException;
     */
    EPHEMERAL (1, true, false),

    /**
     * 暂时顺序节点:基本特性与暂时节点一致。创建节点的过程中,zookeeper会在其名字后自己主动追加一个单调增长的数字后缀。作为新的节点名; 
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);
    private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);
    private boolean ephemeral;
    private boolean sequential;
    private int flag;
    CreateMode(int flag, boolean ephemeral, boolean sequential) {
        this.flag = flag;
        this.ephemeral = ephemeral;
        this.sequential = sequential;
    }
    public boolean isEphemeral() {
        return ephemeral;
    }
    public boolean isSequential() {
        return sequential;
    }
    public int toFlag() {
        return flag;
    }
    static public CreateMode fromFlag(int flag) throws KeeperException {
        switch(flag) {
        case 0: return CreateMode.PERSISTENT;
        case 1: return CreateMode.EPHEMERAL;
        case 2: return CreateMode.PERSISTENT_SEQUENTIAL;
        case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ;
        default:
            LOG.error("Received an invalid flag value to convert to a CreateMode");
            throw new KeeperException.BadArgumentsException();
        }
    }
}


測试代码:
package zookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

public class DistributedLock implements Watcher{
    private int threadId;
    private ZooKeeper zk = null;
    private String selfPath;
    private String waitPath;
    private String LOG_PREFIX_OF_THREAD;
    private static final int SESSION_TIMEOUT = 10000;
    private static final String GROUP_PATH = "/disLocks";
    private static final String SUB_PATH = "/disLocks/sub";
    private static final String CONNECTION_STRING = "192.168.*.*:2181";
    
    private static final int THREAD_NUM = 10; 
    //确保连接zk成功。
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    //确保全部线程执行结束;
    private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);
    private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class);
    public DistributedLock(int id) {
        this.threadId = id;
        LOG_PREFIX_OF_THREAD = "【第"+threadId+"个线程】";
    }
    public static void main(String[] args) {
        for(int i=0; i < THREAD_NUM; i++){
            final int threadId = i+1;
            new Thread(){
                @Override
                public void run() {
                    try{
                        DistributedLock dc = new DistributedLock(threadId);
                        dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
                        //GROUP_PATH不存在的话,由一个线程创建就可以;
                        synchronized (threadSemaphore){
                            dc.createPath(GROUP_PATH, "该节点由线程" + threadId + "创建", true);
                        }
                        dc.getLock();
                    } catch (Exception e){
                        LOG.error("【第"+threadId+"个线程】 抛出的异常:");
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        try {
            threadSemaphore.await();
            LOG.info("全部线程执行结束!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    /**
     * 获取锁
     * @return
     */
    private void getLock() throws KeeperException, InterruptedException {
        selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath);
        if(checkMinPath()){
            getLockSuccess();
        }
    }
    /**
     * 创建节点
     * @param path 节点path
     * @param data 初始数据内容
     * @return
     */
    public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {
        if(zk.exists(path, needWatch)==null){
            LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "
                    + this.zk.create( path,
                    data.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT )
                    + ", content: " + data );
        }
        return true;
    }
    /**
     * 创建ZK连接
     * @param connectString	 ZKserver地址列表
     * @param sessionTimeout Session超时时间
     */
    public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {
            zk = new ZooKeeper( connectString, sessionTimeout, this);
            connectedSemaphore.await();
    }
    /**
     * 获取锁成功
    */
    public void getLockSuccess() throws KeeperException, InterruptedException {
        if(zk.exists(this.selfPath,false) == null){
            LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");
            return;
        }
        LOG.info(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!

"); Thread.sleep(2000); LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath); zk.delete(this.selfPath, -1); releaseConnection(); threadSemaphore.countDown(); } /** * 关闭ZK连接 */ public void releaseConnection() { if ( this.zk !=null ) { try { this.zk.close(); } catch ( InterruptedException e ) {} } LOG.info(LOG_PREFIX_OF_THREAD + "释放连接"); } /** * 检查自己是不是最小的节点 * @return */ public boolean checkMinPath() throws KeeperException, InterruptedException { List<String> subNodes = zk.getChildren(GROUP_PATH, false); Collections.sort(subNodes); int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1)); switch (index){ case -1:{ LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath); return false; } case 0:{ LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath); return true; } default:{ this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1); LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath); try{ zk.getData(waitPath, true, new Stat()); return false; }catch(KeeperException e){ if(zk.exists(waitPath,false) == null){ LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?"); return checkMinPath(); }else{ throw e; } } } } } @Override public void process(WatchedEvent event) { if(event == null){ return; } Event.KeeperState keeperState = event.getState(); Event.EventType eventType = event.getType(); if ( Event.KeeperState.SyncConnected == keeperState) { if ( Event.EventType.None == eventType ) { LOG.info( LOG_PREFIX_OF_THREAD + "成功连接上ZKserver" ); connectedSemaphore.countDown(); }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { LOG.info(LOG_PREFIX_OF_THREAD + "收到情报。排我前面的家伙已挂,我是不是能够出山了?"); try { if(checkMinPath()){ getLockSuccess(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }else if ( Event.KeeperState.Disconnected == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "与ZKserver断开连接" ); } else if ( Event.KeeperState.AuthFailed == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "权限检查失败" ); } else if ( Event.KeeperState.Expired == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "会话失效" ); } } }


log配置文件:
# DEFAULT 
log4j.rootLogger=INFO,CONSOLE

#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %m%n


log4j.appender.COMMONSTAT=org.apache.log4j.DailyRollingFileAppender
log4j.appender.COMMONSTAT.Threshold=INFO
log4j.appender.COMMONSTAT.File=/home/zookeeper/zookeeper-test-agent/logs/test.log
log4j.appender.COMMONSTAT.DatePattern=‘.‘yyyy-MM-dd

log4j.appender.COMMONSTAT.layout=org.apache.log4j.PatternLayout
log4j.appender.COMMONSTAT.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] - %m%n

log4j.logger.org.displaytag=WARN
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.springframework=WARN
log4j.logger.org.I0Itec=WARN
log4j.logger.commonStat=INFO,COMMONSTAT

执行结果:
2014-11-19 11:34:10,894 - 【第9个线程】成功连接上ZKserver
2014-11-19 11:34:10,895 - 【第8个线程】成功连接上ZKserver
2014-11-19 11:34:10,894 - 【第1个线程】成功连接上ZKserver
2014-11-19 11:34:10,894 - 【第7个线程】成功连接上ZKserver
2014-11-19 11:34:10,894 - 【第4个线程】成功连接上ZKserver
2014-11-19 11:34:10,895 - 【第5个线程】成功连接上ZKserver
2014-11-19 11:34:10,896 - 【第2个线程】成功连接上ZKserver
2014-11-19 11:34:10,894 - 【第10个线程】成功连接上ZKserver
2014-11-19 11:34:10,894 - 【第3个线程】成功连接上ZKserver
2014-11-19 11:34:10,895 - 【第6个线程】成功连接上ZKserver
2014-11-19 11:34:10,910 - 【第9个线程】节点创建成功, Path: /disLocks, content: 该节点由线程9创建
2014-11-19 11:34:10,912 - 【第9个线程】创建锁路径:/disLocks/sub0000000000
2014-11-19 11:34:10,917 - 【第6个线程】创建锁路径:/disLocks/sub0000000001
2014-11-19 11:34:10,917 - 【第9个线程】子节点中。我果然是老大/disLocks/sub0000000000
2014-11-19 11:34:10,921 - 【第3个线程】创建锁路径:/disLocks/sub0000000002
2014-11-19 11:34:10,922 - 【第6个线程】获取子节点中。排在我前面的/disLocks/sub0000000000
2014-11-19 11:34:10,923 - 【第9个线程】获取锁成功,赶紧干活!

2014-11-19 11:34:10,924 - 【第10个线程】创建锁路径:/disLocks/sub0000000003 2014-11-19 11:34:10,924 - 【第3个线程】获取子节点中。排在我前面的/disLocks/sub0000000001 2014-11-19 11:34:10,928 - 【第10个线程】获取子节点中,排在我前面的/disLocks/sub0000000002 2014-11-19 11:34:10,929 - 【第1个线程】创建锁路径:/disLocks/sub0000000004 2014-11-19 11:34:10,932 - 【第5个线程】创建锁路径:/disLocks/sub0000000005 2014-11-19 11:34:10,935 - 【第1个线程】获取子节点中。排在我前面的/disLocks/sub0000000003 2014-11-19 11:34:10,936 - 【第2个线程】创建锁路径:/disLocks/sub0000000006 2014-11-19 11:34:10,936 - 【第5个线程】获取子节点中,排在我前面的/disLocks/sub0000000004 2014-11-19 11:34:10,940 - 【第4个线程】创建锁路径:/disLocks/sub0000000007 2014-11-19 11:34:10,941 - 【第2个线程】获取子节点中,排在我前面的/disLocks/sub0000000005 2014-11-19 11:34:10,943 - 【第8个线程】创建锁路径:/disLocks/sub0000000008 2014-11-19 11:34:10,944 - 【第4个线程】获取子节点中。排在我前面的/disLocks/sub0000000006 2014-11-19 11:34:10,945 - 【第7个线程】创建锁路径:/disLocks/sub0000000009 2014-11-19 11:34:10,946 - 【第8个线程】获取子节点中。排在我前面的/disLocks/sub0000000007 2014-11-19 11:34:10,947 - 【第7个线程】获取子节点中。排在我前面的/disLocks/sub0000000008 2014-11-19 11:34:12,923 - 【第9个线程】删除本节点:/disLocks/sub0000000000 2014-11-19 11:34:12,926 - 【第6个线程】收到情报,排我前面的家伙已挂。我是不是能够出山了? 2014-11-19 11:34:12,928 - 【第6个线程】子节点中。我果然是老大/disLocks/sub0000000001 2014-11-19 11:34:12,930 - 【第9个线程】释放连接 2014-11-19 11:34:12,930 - 【第6个线程】获取锁成功,赶紧干活。 2014-11-19 11:34:14,930 - 【第6个线程】删除本节点:/disLocks/sub0000000001 2014-11-19 11:34:14,937 - 【第3个线程】收到情报,排我前面的家伙已挂,我是不是能够出山了? 2014-11-19 11:34:14,941 - 【第3个线程】子节点中,我果然是老大/disLocks/sub0000000002 2014-11-19 11:34:14,943 - 【第6个线程】释放连接 2014-11-19 11:34:14,946 - 【第3个线程】获取锁成功,赶紧干活。 2014-11-19 11:34:16,946 - 【第3个线程】删除本节点:/disLocks/sub0000000002 2014-11-19 11:34:16,949 - 【第10个线程】收到情报,排我前面的家伙已挂,我是不是能够出山了? 2014-11-19 11:34:16,951 - 【第10个线程】子节点中。我果然是老大/disLocks/sub0000000003 2014-11-19 11:34:16,953 - 【第3个线程】释放连接 2014-11-19 11:34:16,953 - 【第10个线程】获取锁成功。赶紧干活! 2014-11-19 11:34:18,953 - 【第10个线程】删除本节点:/disLocks/sub0000000003 2014-11-19 11:34:18,957 - 【第1个线程】收到情报,排我前面的家伙已挂,我是不是能够出山了? 2014-11-19 11:34:18,960 - 【第10个线程】释放连接 2014-11-19 11:34:18,961 - 【第1个线程】子节点中,我果然是老大/disLocks/sub0000000004 2014-11-19 11:34:18,964 - 【第1个线程】获取锁成功,赶紧干活。 2014-11-19 11:34:20,964 - 【第1个线程】删除本节点:/disLocks/sub0000000004 2014-11-19 11:34:20,967 - 【第5个线程】收到情报。排我前面的家伙已挂,我是不是能够出山了? 2014-11-19 11:34:20,969 - 【第5个线程】子节点中,我果然是老大/disLocks/sub0000000005 2014-11-19 11:34:20,971 - 【第1个线程】释放连接 2014-11-19 11:34:20,971 - 【第5个线程】获取锁成功,赶紧干活!

2014-11-19 11:34:22,971 - 【第5个线程】删除本节点:/disLocks/sub0000000005 2014-11-19 11:34:22,974 - 【第2个线程】收到情报。排我前面的家伙已挂,我是不是能够出山了? 2014-11-19 11:34:22,978 - 【第2个线程】子节点中,我果然是老大/disLocks/sub0000000006 2014-11-19 11:34:22,979 - 【第5个线程】释放连接 2014-11-19 11:34:22,981 - 【第2个线程】获取锁成功,赶紧干活! 2014-11-19 11:34:24,981 - 【第2个线程】删除本节点:/disLocks/sub0000000006 2014-11-19 11:34:24,985 - 【第4个线程】收到情报。排我前面的家伙已挂,我是不是能够出山了? 2014-11-19 11:34:24,989 - 【第2个线程】释放连接 2014-11-19 11:34:24,989 - 【第4个线程】子节点中,我果然是老大/disLocks/sub0000000007 2014-11-19 11:34:24,995 - 【第4个线程】获取锁成功,赶紧干活!

2014-11-19 11:34:26,995 - 【第4个线程】删除本节点:/disLocks/sub0000000007 2014-11-19 11:34:26,998 - 【第8个线程】收到情报。排我前面的家伙已挂。我是不是能够出山了? 2014-11-19 11:34:27,000 - 【第8个线程】子节点中,我果然是老大/disLocks/sub0000000008 2014-11-19 11:34:27,004 - 【第8个线程】获取锁成功,赶紧干活!

2014-11-19 11:34:27,004 - 【第4个线程】释放连接 2014-11-19 11:34:29,004 - 【第8个线程】删除本节点:/disLocks/sub0000000008 2014-11-19 11:34:29,007 - 【第7个线程】收到情报。排我前面的家伙已挂,我是不是能够出山了? 2014-11-19 11:34:29,009 - 【第7个线程】子节点中。我果然是老大/disLocks/sub0000000009 2014-11-19 11:34:29,010 - 【第8个线程】释放连接 2014-11-19 11:34:29,011 - 【第7个线程】获取锁成功,赶紧干活! 2014-11-19 11:34:31,011 - 【第7个线程】删除本节点:/disLocks/sub0000000009 2014-11-19 11:34:31,017 - 【第7个线程】释放连接 2014-11-19 11:34:31,017 - 全部线程执行结束!




基于zookeeper实现分布式锁(代码片段)

什么是分布式锁分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式... 查看详情

分布式锁的三种实现方式及其比较

1实现方式分布式锁的实现,目前比较常用的有以下几种方案:基于Zookeeper实现实现分布式锁基于缓存(如redis等)分布式锁基于数据库实现分布式锁2基于Zookeeper实现实现分布式锁实现原理是:每个客户端对某个方法加锁时,在zo... 查看详情

分布式锁与实现——基于zookeeper实现

引言ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步... 查看详情

redis实现分布式锁与zookeeper实现分布式锁区别

...前言:在学习过程中,简单的整理了一些redis跟zookeeper实现分布式锁的区别,有需要改正跟补充的地方,希望各位大佬及时指出Redis实现分布式锁思路基于Redis实现分布式锁(setnx)setnx也可以存入key,如果存入key成功返回1,如果存... 查看详情

基于zookeeper实现的分布式互斥锁-interprocessmutex(代码片段)

Curator是ZooKeeper的一个客户端框架,其中封装了分布式互斥锁的实现,最为常用的是InterProcessMutex,本文将对其进行代码剖析简介InterProcessMutex基于Zookeeper实现了分布式的公平可重入互斥锁,类似于单个JVM进程内的ReentrantLock(fair=tru... 查看详情

基于zookeeper实现分布式锁

一、分布式锁介绍       分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。  线程锁:大家都不陌生,主要用来给方法、代码块加锁... 查看详情

基于zookeeper实现分布式锁

1.什么是分布式锁要介绍分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。(1)线程锁:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同... 查看详情

分布式锁与实现——基于zookeeper实现

引言ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步... 查看详情

基于zookeeper实现多进程分布式锁

一、zookeeper简介及基本操作Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。当对目录节点监控状态打开时,一旦目录节点的状态发生变化,Watcher 对象的 process 方法... 查看详情

基于zookeeper的分布式锁(代码片段)

实现分布式锁目前有三种流行方案,分别为基于数据库、Redis、Zookeeper的方案,其中前两种方案网络上有很多资料可以参考,本文不做展开。我们来看下使用Zookeeper如何实现分布式锁。什么是Zookeeper?Zookeeper(业界简称zk)是一种提... 查看详情

分布式锁的实现基于zookeeper(代码片段)

引言ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步... 查看详情

springboot定时任务基于zookeeper的分布式锁实现

基于ZooKeeper分布式锁的流程在zookeeper指定节点(locks)下创建临时顺序节点node_n获取locks下所有子节点children对子节点按节点自增序号从小到大排序判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的... 查看详情

springboot基于zookeeper原生方式实现分布式锁(代码片段)

...背景  我在之前的文章SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理详细介绍了它的使用及其原理, 查看详情

zookeeper--zookeeper分布式锁原理

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区... 查看详情

zookeeper分布式锁简单实践(代码片段)

ZooKeeper分布式锁的实现原理在分布式解决方案中,Zookeeper是一个分布式协调工具。当多个JVM客户端,同时在ZooKeeper上创建相同的一个临时节点,因为临时节点路径是保证唯一,只要谁能够创建节点成功,谁就能够获取到锁。没有... 查看详情

利用zookeeper实现分布式锁(代码片段)

...机环境中,我们可以通过Java提供的并发API来解决;而在分布式环境(会遇到网络故障、消息重复、消息丢失等各种问题)下要复杂得多,常见的解决方案是分布式事务、分布式锁等。本文主要探讨如何利用Zookeeper来实现分布式锁。... 查看详情

基于zookeeper实现分布式锁实践(代码片段)

基于Zookeeper实现分布式锁实践1、什么是Zookeeper?Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件。引用官网的图例:特征:zookeeper的数据机构是一种节点树的数据结构,... 查看详情

zookeeper怎么实现分布式锁

1.利用节点名称的唯一性来实现共享锁ZooKeeper抽象出来的节点结构是一个和unix文件系统类似的小型的树状的目录结构。ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名。例如:我们在Zookeeper目录/test目录下创建,两个客... 查看详情