springboot基于zookeeper和curator实现分布式锁并分析其原理(代码片段)

嘉禾嘉宁papa 嘉禾嘉宁papa     2023-01-17     271

关键词:

一、简介

  我们知道在JDK 的 java.util.concurrent.locks包中提供了可重入锁,读写锁,及超时获取锁的方法等,但在分布式系统中,当多个应用需要共同操作某一个资源时,就没办法使用JDK里的锁实现了,所以今天的主角就是ZooKeeper + Curator 来完成分布式锁,Curator 提供的四种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

本文主要介绍可重入排它锁 InterProcessMutex 的相关使用及源码解读。

二、maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.alian</groupId>
    <artifactId>zookeeper-curator</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zookeeper-curator</name>
    <description>SpringBoot基于Zookeeper和Curator实现分布式锁</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、配置类

3.1、属性配置文件

# zookeeper服务器地址(ip+port)
zookeeper.server=10.130.3.16:2181
# 休眠时间
zookeeper.sleep-time=1000
# 最大重试次数
zookeeper.max-retries=3
# 会话超时时间
zookeeper.session-timeout=15000
# 连接超时时间
zookeeper.connection-timeout=5000

本机环境有限就不搭建集群了,具体还是在于curator分布式锁的使用及原理。

3.2、属性配置类

  此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式

ZookeeperProperties.java

package com.alian.zookeepercurator.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
//读取指定路径配置文件,暂不支持*.yaml文件
@PropertySource(value = "classpath:config/zookeeper.properties", encoding = "UTF-8", ignoreResourceNotFound = true)
public class ZookeeperProperties 

    /**
     * zookeeper服务地址
     */
    private String server;

    /**
     * 重试等待时间
     */
    private int sleepTime;

    /**
     * 最大重试次数
     */
    private int maxRetries;

    /**
     * session超时时间
     */
    private int sessionTimeout;

    /**
     * 连接超时时间
     */
    private int connectionTimeout;


3.3、ZookeeperConfig配置类(重要

ZookeeperConfig.java

  此配置类主要是使用CuratorFramework来连接zookeeper。

package com.alian.zookeepercurator.config;

import com.alian.zookeepercurator.common.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class ZookeeperConfig 

    @Autowired
    private ZookeeperProperties zookeeperProperties;

    @Bean
    public CuratorFramework curatorFrameworkClient() 
        //重试策略,ExponentialBackoffRetry(1000,3)这里表示等待1s重试,最大重试次数为3次
        RetryPolicy policy = new ExponentialBackoffRetry(zookeeperProperties.getSleepTime(), zookeeperProperties.getMaxRetries());
        //构建CuratorFramework实例
        CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory
                .builder()
                .connectString(zookeeperProperties.getServer())
                .sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
                .connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
                .retryPolicy(policy)
                .build();
        //启动实例
        curatorFrameworkClient.start();
        return curatorFrameworkClient;
    

    //采用这种方式注册bean可以比较优雅的关闭连接
    @Bean(destroyMethod = "destroy")
    public ZookeeperClient zookeeperClient(CuratorFramework curatorFrameworkClient) 
        return new ZookeeperClient(curatorFrameworkClient);
    


3.4、ZookeeperClient配置类(重要

ZookeeperClient.java

  这个bean是在上面的配置类里定义的,还定义了销毁的方法,这样的好处是,当服务断开后,可以关闭连接,如果直接关闭服务可能会抛出一个异常。使用和其他的使用是一样的,当然如果你为了方便,使用@Component也没有问题。

package com.alian.zookeepercurator.common;

import com.alian.zookeepercurator.lock.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

@Slf4j
public class ZookeeperClient 

    private CuratorFramework curatorFramework;

    public ZookeeperClient(CuratorFramework curatorFramework) 
        this.curatorFramework = curatorFramework;
    

    public <T> T lock(AbstractLock<T> abstractLock) 
        //获取锁路径
        String lockPath = abstractLock.getLockPath();
        //创建InterProcessMutex实例
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); //创建锁对象
        boolean success = false;
        try 
            try 
                //加锁
                success = lock.acquire(abstractLock.getTime(), abstractLock.getTimeUnit()); //获取锁
             catch (Exception e) 
                throw new RuntimeException("尝试获取锁异常:" + e.getMessage() + ", lockPath " + lockPath);
            
            //判断是否加锁成功
            if (success) 
                return abstractLock.execute();
             else 
            	log.info("获取锁失败,返回null");
                return null;
            
         finally 
            try 
                if (success) 
                    //释放锁
                    lock.release();
                
             catch (Exception e) 
                log.error("释放锁异常: , lockPath ", e.getMessage(), lockPath);
            
        
    

    //bean的销毁方法
    public void destroy() 
        try 
            log.info("ZookeeperClient销毁方法,如果zookeeper连接不为空,则关闭连接");
            if (getCuratorFramework() != null) 
                //这种方式比较优雅的关闭连接
                getCuratorFramework().close();
            
         catch (Exception e) 
            log.error("stop zookeeper client error ", e.getMessage());
        
    

    public CuratorFramework getCuratorFramework() 
        return curatorFramework;
    

四、业务编写

4.1、抽象类AbstractLock

AbstractLock.java
  定义一个抽象锁的类,包含锁路径,过期时间及时间单位,子类只需要实现execute方法即可。

package com.alian.zookeepercurator.common;

import java.util.concurrent.TimeUnit;

public abstract class AbstractLock<T> 

    /**
     * 锁路径
     */
    protected String lockPath;

    /**
     * 超时时间
     */
    protected long time;

    protected TimeUnit timeUnit;

    public AbstractLock(String lockPath, long time, TimeUnit timeUnit) 
        this.lockPath = lockPath;
        this.time = time;
        this.timeUnit = timeUnit;
    

    public void setLockPath(String lockPath) 
        this.lockPath = lockPath;
    

    public String getLockPath() 
        return lockPath;
    

    public long getTime() 
        return time;
    

    public void setTime(long time) 
        this.time = time;
    

    public void setTimeUnit(TimeUnit timeUnit) 
        this.timeUnit = timeUnit;
    

    public TimeUnit getTimeUnit() 
        return timeUnit;
    

    /**
     * 执行业务的方法
     *
     * @return
     */
    public abstract T execute();

4.2、锁使实现(核心

CuratorLockService.java

package com.alian.zookeepercurator.service;

import com.alian.zookeepercurator.common.ZookeeperClient;
import com.alian.zookeepercurator.common.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class CuratorLockService 

    @Autowired
    private ZookeeperClient zookeeperClient;

    @Autowired
    private CuratorFramework curatorFramework;

    //库存存取的路径
    private static final String dataPath = "/root/data/stock";

    //初始化库存的路径
    private static final String initPath = "/root/init/stock";

    /**
     * 此方法系统启动执行,使用zookeeper存一个库存用于测试,这里也使用了锁。(只是一个模拟初始化库存的方法)
     */
    @PostConstruct
    public void init() 
        zookeeperClient.lock(new AbstractLock<Boolean>(initPath, 20, TimeUnit.SECONDS查看详情  

springboot整合dubbo和zookeeper

...就像调用本地服务一样简单。截至目前,Dubbo发布了基于SpringBoot构建的版本,版本号为0.2.0,这使得其与SpringBoot项目整合变得更为简单方便。而Zookeeper在这里充当的是服务注册中心的角色,我们将各个微服务提供的服务通过Dubbo... 查看详情

springboot基于zookeeper和curator实现分布式锁并分析其原理(代码片段)

目录一、简介二、maven依赖三、配置类3.1、属性配置文件3.2、属性配置类3.3、ZookeeperConfig配置类(重要)3.4、ZookeeperClient配置类(重要)四、业务编写4.1、抽象类AbstractLock4.2、锁使实现(核心)4.3、controller... 查看详情

springboot定时任务基于zookeeper的分布式锁实现

基于ZooKeeper分布式锁的流程在zookeeper指定节点(locks)下创建临时顺序节点node_n获取locks下所有子节点children对子节点按节点自增序号从小到大排序判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的... 查看详情

springboot整合dubbo和zookeeper

SpringBoot整合Dubbo和Zookeeper SpringBoot整合Dubbo和Zookeeper环境介绍Zookeeper安装启动Dubboadmin搭建创建主maven项目创建子springboot项目 环境介绍zookeeper安装dubbo-admin查看管理注册中心服务提供者和消费者Zookeeper安装http://zookeeper.apache.... 查看详情

springboot2整合zookeeper组件,管理架构中服务协调

本文源码:GitHub·点这里||GitEE·点这里一、Zookeeper基础简介1、概念简介Zookeeper是一个Apache开源的分布式的应用,为系统架构提供协调服务。从设计模式角度来审视:该组件是一个基于观察者模式设计的框架,负责存储和管理数据... 查看详情

springboot和dubbo+zookeeper组合案例

SpringBoot融合了maven的特点,所以可以和maven完美整合接下来要做一个分布式项目,首先要有共享的接口先建一个maven项目 详情:UserAddress.javapackagecom.changping.bean;importjava.io.Serializable;publicclassUserAddressimplementsSerializable{ private 查看详情

9.2springboot使用zookeeper和dubbo

一.项目搭建1.步骤建立一个空项目,添加两个springboot模块:provide-server和consumer-server 一个提供服务另一个消费服务(略)两个模块的pom.xml中都导入依赖编写provide-server代码编写consumer-server代码启动dubbo和zookeeper(略)启动prov... 查看详情

zkwebx-基于淘宝zkweb用springboot重构

访问地址:https://github.com/wendrewshay/zkwebx介绍:zkWebx-基于淘宝zkWeb用springboot重构的项目,可以方便管理zookeeper该项目是本人抽空整理的,放线上供各位同行使用,欢迎pull和issue。 查看详情

springboot整合dubbo+zookeeper

dockerpullzookeeperdockerrun--namezk01-p2181:2181--restartalways-d2e30cac00aca 表明zookeeper已成功启动 Zookeeper和Dubbo•ZooKeeperZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务。它是一个为分布式应用提供一致性服务的软件,... 查看详情

springboot整合dubbox与zookeeper

springboot中dubbo依赖的引入和配置(application.properties)参见:https://blog.csdn.net/wohaqiyi/article/details/72967805。需要注意的是在消费方工程里,controller包可以不必放到service包下(我认为也不应该),只要在application.properties里合理配置s... 查看详情

kafkakafka核心使用和springboot整合(代码片段)

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:... 查看详情

springboot整合dubbo+zookeeper——实现发布并访问远程服务

前言什么是Dubbo?——开源分布式服务框架Dubbo(读音[ˈdʌbəʊ])是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC实现服务的输出和输入功能,可以和Spring框架无缝集成。Dubbo是一款高性能、轻量级的... 查看详情

带着新人学springboot的应用12(springboot+dubbo+zookeeper下)

  上半节已经下载好了Zookeeper,以及新建了两个应用provider和consumer,这一节我们就结合dubbo来测试一下分布式可不可以用。  现在就来简单用一下,注意:这里只是涉及最简单的部分,新手入门用的,详细的内容要学习的可... 查看详情

基于solr和zookeeper的分布式搜索方案的配置

...要使用SolrCloud来满足这些需求。 SolrCloud是基于Solr和Zookeeper的分布式搜索方案,它的主要思想是 查看详情

springboot2整合kafka组件,应用案例和流程详解(代码片段)

本文源码:GitHub·点这里||GitEE·点这里一、搭建Kafka环境1、下载解压--下载wgethttp://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz--解压tar-zxvfkafka_2.11-2.2.0.tgz--重命名mvkafka_2.11-2.2.0kafka2.112、启动Kafka服务kafka依赖ZooKeeper服务,需要本... 查看详情

日志系统之基于zookeeper的分布式协同设计

...这段时间在设计和实现日志系统。在整个日志系统系统中Zookeeper的作用非常重要——它用于协调各个分布式组件并提供必要的配置信息和元数据。这篇文章主要分享一下Zookeeper的使用场景。这里主要涉及到Zookeeper在日志系统中的... 查看详情

搭建基于zookeeper和solr的分布式搜索:solrcloud

首先是zookeeper的集群搭建:1.新建文件夹/usr/local/solrcloud2.把解压的zookeeper拷贝三份到上面的solrcloud文件夹zookeeper01、zookeeper02、zookeeper033.zookeeper01目录下创建一个data/myid,myid写入1(02写入2,03写入3)4.配置zookeeper01-zookeeper03端口... 查看详情