死磕java同步系列之semaphore源码解析(代码片段)

author author     2022-12-20     412

关键词:

问题

(1)Semaphore是什么?

(2)Semaphore具有哪些特性?

(3)Semaphore通常使用在什么场景中?

(4)Semaphore的许可次数是否可以动态增减?

(5)Semaphore如何实现限流?

简介

Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。

特性

Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流。

下面我们一起来学习Java中Semaphore是如何实现的。

类结构

技术图片

Semaphore中包含了一个实现了AQS的同步器Sync,以及它的两个子类FairSync和NonFairSync,这说明Semaphore也是区分公平模式和非公平模式的。

源码分析

基于之前对于ReentrantLock和ReentrantReadWriteLock的分析,这篇文章相对来说比较简单,之前讲过的一些方法将直接略过,有兴趣的可以拉到文章底部查看之前的文章。

内部类Sync

// java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer 
    private static final long serialVersionUID = 1192457210091910933L;
    // 构造方法,传入许可次数,放入state中
    Sync(int permits) 
        setState(permits);
    
    // 获取许可次数
    final int getPermits() 
        return getState();
    
    // 非公平模式尝试获取许可
    final int nonfairTryAcquireShared(int acquires) 
        for (;;) 
            // 看看还有几个许可
            int available = getState();
            // 减去这次需要获取的许可还剩下几个许可
            int remaining = available - acquires;
            // 如果剩余许可小于0了则直接返回
            // 如果剩余许可不小于0,则尝试原子更新state的值,成功了返回剩余许可
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        
    
    // 释放许可
    protected final boolean tryReleaseShared(int releases) 
        for (;;) 
            // 看看还有几个许可
            int current = getState();
            // 加上这次释放的许可
            int next = current + releases;
            // 检测溢出
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 如果原子更新state的值成功,就说明释放许可成功,则返回true
            if (compareAndSetState(current, next))
                return true;
        
    
    // 减少许可
    final void reducePermits(int reductions) 
        for (;;) 
            // 看看还有几个许可
            int current = getState();
            // 减去将要减少的许可
            int next = current - reductions;
            // 检测举出
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // 原子更新state的值,成功了返回true
            if (compareAndSetState(current, next))
                return;
        
    
    // 销毁许可
    final int drainPermits() 
        for (;;) 
            // 看看还有几个许可
            int current = getState();
            // 如果为0,直接返回
            // 如果不为0,把state原子更新为0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        
    

通过Sync的几个实现方法,我们获取到以下几点信息:

(1)许可是在构造方法时传入的;

(2)许可存放在状态变量state中;

(3)尝试获取一个许可的时候,则state的值减1;

(4)当state的值为0的时候,则无法再获取许可;

(5)释放一个许可的时候,则state的值加1;

(6)许可的个数可以动态改变;

内部类NonfairSync

// java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync 
    private static final long serialVersionUID = -2694183684443567898L;
    // 构造方法,调用父类的构造方法
    NonfairSync(int permits) 
        super(permits);
    
    // 尝试获取许可,调用父类的nonfairTryAcquireShared()方法
    protected int tryAcquireShared(int acquires) 
        return nonfairTryAcquireShared(acquires);
    

非公平模式下,直接调用父类的nonfairTryAcquireShared()尝试获取许可。

内部类FairSync

// java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync 
    private static final long serialVersionUID = 2014338818796000944L;
    // 构造方法,调用父类的构造方法
    FairSync(int permits) 
        super(permits);
    
    // 尝试获取许可
    protected int tryAcquireShared(int acquires) 
        for (;;) 
            // 公平模式需要检测是否前面有排队的
            // 如果有排队的直接返回失败
            if (hasQueuedPredecessors())
                return -1;
            // 没有排队的再尝试更新state的值
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        
    

公平模式下,先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新state的值。

构造方法

// 构造方法,创建时要传入许可次数,默认使用非公平模式
public Semaphore(int permits) 
    sync = new NonfairSync(permits);

// 构造方法,需要传入许可次数,及是否公平模式
public Semaphore(int permits, boolean fair) 
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);

创建Semaphore时需要传入许可次数。

Semaphore默认也是非公平模式,但是你可以调用第二个构造方法声明其为公平模式。

下面的方法在学习过前面的内容看来都比较简单,彤哥这里只列举Semaphore支持的一些功能了。

以下的方法都是针对非公平模式来描述。

acquire()方法

public void acquire() throws InterruptedException 
    sync.acquireSharedInterruptibly(1);

获取一个许可,默认使用的是可中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。

acquireUninterruptibly()方法

public void acquireUninterruptibly() 
    sync.acquireShared(1);

获取一个许可,非中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。

tryAcquire()方法

public boolean tryAcquire() 
    return sync.nonfairTryAcquireShared(1) >= 0;

尝试获取一个许可,使用Sync的非公平模式尝试获取许可方法,不论是否获取到许可都返回,只尝试一次,不会进入队列排队。

tryAcquire(long timeout, TimeUnit unit)方法

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException 
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

尝试获取一个许可,先尝试一次获取许可,如果失败则会等待timeout时间,这段时间内都没有获取到许可,则返回false,否则返回true;

release()方法

public void release() 
    sync.releaseShared(1);

释放一个许可,释放一个许可时state的值会加1,并且会唤醒下一个等待获取许可的线程。

acquire(int permits)方法

public void acquire(int permits) throws InterruptedException 
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);

一次获取多个许可,可中断方式。

acquireUninterruptibly(int permits)方法

public void acquireUninterruptibly(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);

一次获取多个许可,非中断方式。

tryAcquire(int permits)方法

public boolean tryAcquire(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;

一次尝试获取多个许可,只尝试一次。

tryAcquire(int permits, long timeout, TimeUnit unit)方法

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException 
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

尝试获取多个许可,并会等待timeout时间,这段时间没获取到许可则返回false,否则返回true。

release(int permits)方法

public void release(int permits) 
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);

一次释放多个许可,state的值会相应增加permits的数量。

availablePermits()方法

public int availablePermits() 
    return sync.getPermits();

获取可用的许可次数。

drainPermits()方法

public int drainPermits() 
    return sync.drainPermits();

销毁当前可用的许可次数,对于已经获取的许可没有影响,会把当前剩余的许可全部销毁。

reducePermits(int reduction)方法

protected void reducePermits(int reduction) 
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);

减少许可的次数。

总结

(1)Semaphore,也叫信号量,通常用于控制同一时刻对共享资源的访问上,也就是限流场景;

(2)Semaphore的内部实现是基于AQS的共享锁来实现的;

(3)Semaphore初始化的时候需要指定许可的次数,许可的次数是存储在state中;

(4)获取一个许可时,则state值减1;

(5)释放一个许可时,则state值加1;

(6)可以动态减少n个许可;

(7)可以动态增加n个许可吗?

彩蛋

(1)如何动态增加n个许可?

答:调用release(int permits)即可。我们知道释放许可的时候state的值会相应增加,再回头看看释放许可的源码,发现与ReentrantLock的释放锁还是有点区别的,Semaphore释放许可的时候并不会检查当前线程有没有获取过许可,所以可以调用释放许可的方法动态增加一些许可。

(2)如何实现限流?

答:限流,即在流量突然增大的时候,上层要能够限制住突然的大流量对下游服务的冲击,在分布式系统中限流一般做在网关层,当然在个别功能中也可以自己简单地来限流,比如秒杀场景,假如只有10个商品需要秒杀,那么,服务本身可以限制同时只进来100个请求,其它请求全部作废,这样服务的压力也不会太大。

使用Semaphore就可以直接针对这个功能来限流,以下是代码实现:

public class SemaphoreTest 
    public static final Semaphore SEMAPHORE = new Semaphore(100);
    public static final AtomicInteger failCount = new AtomicInteger(0);
    public static final AtomicInteger successCount = new AtomicInteger(0);

    public static void main(String[] args) 
        for (int i = 0; i < 1000; i++) 
            new Thread(()->seckill()).start();
        
    

    public static boolean seckill() 
        if (!SEMAPHORE.tryAcquire()) 
            System.out.println("no permits, count="+failCount.incrementAndGet());
            return false;
        

        try 
            // 处理业务逻辑
            Thread.sleep(2000);
            System.out.println("seckill success, count="+successCount.incrementAndGet());
         catch (InterruptedException e) 
            // todo 处理异常
            e.printStackTrace();
         finally 
            SEMAPHORE.release();
        
        return true;
    

推荐阅读

1、 死磕 java同步系列之开篇

2、 死磕 java魔法类之Unsafe解析

3、 死磕 java同步系列之JMM(Java Memory Model)

4、 死磕 java同步系列之volatile解析

5、 死磕 java同步系列之synchronized解析

6、 死磕 java同步系列之自己动手写一个锁Lock

7、 死磕 java同步系列之AQS起篇

8、 死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

9、 死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

10、 死磕 java同步系列之ReentrantLock VS synchronized

11、 死磕 java同步系列之ReentrantReadWriteLock源码解析

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

技术图片

死磕java同步系列之countdownlatch源码解析(代码片段)

??欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章,与彤哥一起畅游源码的海洋。(手机横屏看源码更方便)问题(1)CountDownLatch是什么?(2)CountDownLatch具有哪些特性?(3)CountDownLatch通常运用在什么场景中?(4... 查看详情

死磕java同步系列之phaser源码解析

问题(1)Phaser是什么?(2)Phaser具有哪些特性?(3)Phaser相对于CyclicBarrier和CountDownLatch的优势?简介Phaser,翻译为阶段,它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,... 查看详情

死磕java同步系列之reentrantreadwritelock源码解析(代码片段)

问题(1)读写锁是什么?(2)读写锁具有哪些特性?(3)ReentrantReadWriteLock是怎么实现读写锁的?(4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap?简介读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问... 查看详情

死磕java同步系列之开篇

...,本来是准备写锁相关的内容,但是java中的CountDownLatch、Semaphore、CyclicBarrier这些类又不属于锁,它们和锁又有很多共同点,都是为了协同多线程的执行,都是一种同步器,所以这里就借用同步来取名字了,也就是“同步系列”... 查看详情

死磕java同步系列之synchronized解析(代码片段)

问题(1)synchronized的特性?(2)synchronized的实现原理?(3)synchronized是否可重入?(4)synchronized是否是公平锁?(5)synchronized的优化?(6)synchronized的五种使用方式?简介synchronized关键字是Java里面最基本的同步手段,它经... 查看详情

死磕java同步系列之终结篇

欢×××”,查看更多源码系列文章,与彤哥一起畅游源码的海洋。(手机横屏看源码更方便)简介同步系列到此就结束了,本篇文章对同步系列做一个总结。脑图下面是关于同步系列的一份脑图,列举了主要的知识点和问题点,... 查看详情

死磕java同步系列之volatile解析(代码片段)

问题(1)volatile是如何保证可见性的?(2)volatile是如何禁止重排序的?(3)volatile的实现原理?(4)volatile的缺陷?简介volatile可以说是Java虚拟机提供的最轻量级的同步机制了,但是它并不容易被正确地理解,以至于很多人不... 查看详情

死磕java线程系列之线程池深入解析——未来任务执行流程

...怎么实现的呢?建议学习本章前先去看看彤哥之前写的《死磕java线程系列之自己动手写一个线程池 查看详情

死磕java线程系列之线程池深入解析——生命周期

(手机横屏看源码更方便)注:java源码分析部分如无特殊说明均基于java8版本。注:线程池源码部分如无特殊说明均指ThreadPoolExecutor类。简介上一章我们一起重温了下线程的生命周期(六种状态还记得不?),但是你知不知道其... 查看详情

死磕java线程系列之线程池深入解析——体系结构

(手机横屏看源码更方便)注:java源码分析部分如无特殊说明均基于java8版本。简介Java的线程池是块硬骨头,对线程池的源码做深入研究不仅能提高对Java整个并发编程的理解,也能提高自己在面试中的表现,增加被录取的可能... 查看详情

死磕java线程系列之线程池深入解析——构造方法

(手机横屏看源码更方便)注:java源码分析部分如无特殊说明均基于java8版本。简介ThreadPoolExecutor的构造方法是创建线程池的入口,虽然比较简单,但是信息量很大,由此也能引发一系列的问题,同样地,这也是面试中经常被问... 查看详情

死磕java线程系列之线程池深入解析——定时任务执行流程

(手机横屏看源码更方便)注:java源码分析部分如无特殊说明均基于java8版本。注:本文基于ScheduledThreadPoolExecutor定时线程池类。简介前面我们一起学习了普通任务、未来任务的执行流程,今天我们再来学习一种新的任务——定... 查看详情

死磕java同步系列之zookeeper分布式锁

问题(1)zookeeper如何实现分布式锁?(2)zookeeper分布式锁有哪些优点?(3)zookeeper分布式锁有哪些缺点?简介zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它可以为分布式应用提供一致性的服务,它是Hadoop和... 查看详情

java进阶之路-java同步系列之reentrantreadwritelock源码解析(代码片段)

本文参照java同步系列之ReentrantReadWriteLock源码解析简介读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写... 查看详情

死磕java同步系列之jmm(javamemorymodel)(代码片段)

简介Java内存模型是在硬件内存模型上的更高层的抽象,它屏蔽了各种硬件和操作系统访问的差异性,保证了Java程序在各种平台下对内存的访问都能达到一致的效果。硬件内存模型在正式讲解Java的内存模型之前,我们有必要先了... 查看详情

死磕java同步系列之mysql分布式锁

问题(1)什么是分布式锁?(2)为什么需要分布式锁?(3)mysql如何实现分布式锁?(4)mysql分布式锁的优点和缺点?简介随着并发量的不断增加,单机的服务迟早要向多节点或者微服务进化,这时候原来单机模式下使用的sync... 查看详情

死磕java同步系列之redis分布式锁进化史

问题(1)redis如何实现分布式锁?(2)redis分布式锁有哪些优点?(3)redis分布式锁有哪些缺点?(4)redis实现分布式锁有没有现成的轮子可以使用?简介Redis(全称:RemoteDictionaryServer远程字典服务)是一个开源的使用ANSIC语言... 查看详情

死磕java线程系列之自己动手写一个线程池(代码片段)

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章,与彤哥一起畅游源码的海洋。(手机横屏看源码更方便)问题(1)自己动手写一个线程池需要考虑哪些因素?(2)自己动手写的线程池如何测试?简介线程池是Java... 查看详情