关键词:
1. 为什么选择 Etcd
据官网介绍,Etcd 是一个分布式,可靠的 Key-Value 存储系统,主要用于存储分布式系统中的关键数据。初见之下,Etcd 与 NoSQL 数据库系统有几分相似,但作为数据库绝非 Etcd 所长,其读写性能远不如 MongoDB、Redis 等 Key-Value 存储系统。“让专业的人做专业的事!” Ectd 作为一个高可用的键值存储系统,有很多典型的应用场景,本文将介绍 Etcd 的优秀实践之一:分布式锁。
1.1 Etcd 优点
目前,可实现分布式锁的开源软件有很多,其中应用最广泛、大家最熟悉的应该就是 ZooKeeper,此外还有数据库、Redis、Chubby 等。但若从读写性能、可靠性、可用性、安全性和复杂度等方面综合考量,作为后起之秀的 Etcd 无疑是其中的 “佼佼者” 。它完全媲美业界“名宿”ZooKeeper,在有些方面,Etcd 甚至超越了 ZooKeeper,如 Etcd 采用的 Raft 协议就要比 ZooKeeper 采用的 Zab 协议简单、易理解。
Etcd 作为 CoreOS 开源项目,有以下的特点。
- 简单:使用 Go 语言编写,部署简单;支持 cURL 方式的用户 API (HTTP+JSON),使用简单;开源 Java 客户端使用简单;
- 安全:可选 SSL 证书认证;
- 快速:在保证强一致性的同时,读写性能优秀,详情可查看官方提供的 Benchmark 数据 ;
- 可靠:采用 Raft 算法实现分布式系统数据的高可用性和强一致性。
1.2 分布式锁的基本原理
分布式环境下,多台机器上多个进程对同一个共享资源(数据、文件等)进行操作,如果不做互斥,就有可能出现“余额扣成负数”,或者“商品超卖”的情况。为了解决这个问题,需要分布式锁服务。首先,来看一下分布式锁应该具备哪些条件。
- 互斥性:在任意时刻,对于同一个锁,只有一个客户端能持有,从而保证一个共享资源同一时间只能被一个客户端操作;
- 安全性:即不会形成死锁,当一个客户端在持有锁的期间崩溃而没有主动解锁的情况下,其持有的锁也能够被正确释放,并保证后续其它客户端能加锁;
- 可用性:当提供锁服务的节点发生宕机等不可恢复性故障时,“热备” 节点能够接替故障的节点继续提供服务,并保证自身持有的数据与故障节点一致。
- 对称性:对于任意一个锁,其加锁和解锁必须是同一个客户端,即客户端 A 不能把客户端 B 加的锁给解了。
1.3 Etcd 实现分布式锁的基础
Etcd 的高可用性、强一致性不必多说,前面章节中已经阐明,本节主要介绍 Etcd 支持的以下机制:Watch 机制、Lease 机制、Revision 机制和 Prefix 机制,正是这些机制赋予了 Etcd 实现分布式锁的能力。
- Lease 机制:即租约机制(TTL,Time To Live),Etcd 可以为存储的 Key-Value 对设置租约,当租约到期,Key-Value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,以避免 Key-Value 对过期失效。Lease 机制可以保证分布式锁的安全性,为锁对应的 Key 配置租约,即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放。
- Revision 机制:每个 Key 带有一个 Revision 号,每进行一次事务便加一,因此它是全局唯一的,如初始值为 0,进行一次 put(key, value),Key 的 Revision 变为 1,同样的操作,再进行一次,Revision 变为 2;换成 key1 进行 put(key1, value) 操作,Revision 将变为 3;这种机制有一个作用:通过 Revision 的大小就可以知道写操作的顺序。在实现分布式锁时,多个客户端同时抢锁,根据 Revision 号大小依次获得锁,可以避免 “羊群效应” (也称“惊群效应”),实现公平锁。
- Prefix 机制:即前缀机制,也称目录机制,例如,一个名为 /mylock 的锁,两个争抢它的客户端进行写操作,实际写入的 Key 分别为:key1=“/mylock/UUID1”,key2=“/mylock/UUID2”,其中,UUID 表示全局唯一的 ID,确保两个 Key 的唯一性。很显然,写操作都会成功,但返回的 Revision 不一样,那么,如何判断谁获得了锁呢?通过前缀“/mylock” 查询,返回包含两个 Key-Value 对的 Key-Value 列表,同时也包含它们的 Revision,通过 Revision 大小,客户端可以判断自己是否获得锁,如果抢锁失败,则等待锁释放(对应的 Key 被删除或者租约过期),然后再判断自己是否可以获得锁。
- Watch 机制:即监听机制,Watch 机制支持监听某个固定的 Key,也支持监听一个范围(前缀机制),当被监听的 Key 或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败,可通过 Prefix 机制返回的 Key-Value 列表获得 Revision 比自己小且相差最小的 Key(称为 Pre-Key),对 Pre-Key 进行监听,因为只有它释放锁,自己才能获得锁,如果监听到 Pre-Key 的 DELETE 事件,则说明 Pre-Key 已经释放,自己已经持有锁。
2. Etcd 实现分布式锁
2.1 基于 Etcd 的分布式锁业务流程
下面描述了使用 Etcd 实现分布式锁的业务流程,假设对某个共享资源设置的锁名为:/anyrtc/mylock。
步骤1:准备
客户端连接 Etcd,以 /anyrtc/mylock 为前缀创建全局唯一的 Key,假设第一个客户端对应的 Key=“/anyrtc/mylock/UUID1”,第二个为 Key=“/anyrtc/mylock/UUID2”;客户端分别为自己的 Key 创建租约 Lease,租约的长度根据业务耗时确定,假设为 15s。
步骤2:创建定时任务作为租约的“心跳”
在一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,Key 将因租约到期而被删除,从而锁释放,避免死锁。
步骤3:客户端将自己全局唯一的 Key 写入 Etcd
进行 Put 操作,将步骤 1 中创建的 Key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 Put 操作返回的 Revision 分别为1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。
步骤4:客户端判断是否获得锁
客户端以前缀 /anyrtc/mylock 读取 Key-Value 列表(Key-Value 中带有 Key 对应的 Revision),判断自己 Key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 Key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。
步骤5:执行业务
获得锁后,操作共享资源,执行业务代码。
步骤6:释放锁
完成业务流程后,删除对应的 Key 释放锁。
2.2 基于 Etcd 的分布式锁的原理图
根据上一节中介绍的业务流程,基于Etcd的分布式锁示意图如下。
业务流程图大家可参看这篇文章《Zookeeper 分布式锁实现原理》。
2.3 基于golang实现Etcd的分布式锁
分布式锁EtcdLocker代码
package etcd
import (
"context"
clientV3 "go.etcd.io/etcd/client/v3"
"log"
"os"
"time"
)
// EtcdLocker 分布式锁结构体
type EtcdLocker struct
client *clientV3.Client // 连接到etcd的客户端实例
lease clientV3.Lease // 在etcd上的租约实例
leaseId clientV3.LeaseID
cancelFunc context.CancelFunc
option Option
// Option EtcdClient的配置选项
type Option struct
ConnectionTimeout time.Duration // 连接到etcd的超时时间,示例:5*time.Second
LeaseTtl int64 // 租约时长,连接异常断开后,未续租的租约会在这个时间之后失效
Prefix string // 锁前缀
Username string // 用户名,可选
Password string // 密码,可选
Debug bool
// New 创建一把锁
// etcdEndpoints etcd连接信息,示例:[]string"localhost:2379"
// option 连接选项,包clientV3.Config中的配置项很多,我们其实用不到它们那么多,简化一下
func New(etcdEndpoints []string, option Option) (locker *EtcdLocker, err error)
if option.Prefix == ""
option.Prefix = "distribution_lock:"
if option.ConnectionTimeout <= 0
option.ConnectionTimeout = 5 * time.Second
if option.LeaseTtl <= 0
option.LeaseTtl = 5
config := clientV3.Config
Endpoints: etcdEndpoints,
DialTimeout: option.ConnectionTimeout,
Username: option.Username,
Password: option.Password,
locker = &EtcdLocker
option: option,
if locker.client, err = clientV3.New(config); err != nil
return nil, err
var timeoutCtx, cancel = context.Background(), locker.timeoutCancel
timeoutCtx, cancel = context.WithTimeout(context.Background(), option.ConnectionTimeout)
defer cancel()
if _, err := locker.client.Status(timeoutCtx, etcdEndpoints[0]); err != nil
return nil, err
//上锁并创建租约
locker.lease = clientV3.NewLease(locker.client)
var leaseGrantResp *clientV3.LeaseGrantResponse
// 第2个参数TTL,可以用于控制如果当前进程和etcd连接断开了,持有锁的上下文多长时间失效
if leaseGrantResp, err = locker.lease.Grant(context.TODO(), option.LeaseTtl); err != nil
return nil, err
locker.leaseId = leaseGrantResp.ID
var ctx context.Context
// 创建一个可取消的租约,主要是为了退出的时候能够释放
ctx, locker.cancelFunc = context.WithCancel(context.Background())
var keepRespChan <-chan *clientV3.LeaseKeepAliveResponse
if keepRespChan, err = locker.lease.KeepAlive(ctx, locker.leaseId); err != nil
return nil, err
// 续约应答
go func()
for
select
case keepResp := <-keepRespChan:
if keepResp == nil
if locker.option.Debug
log.Printf("进程 %+v 的锁 %+v 的租约已经失效了", os.Getpid(), locker.leaseId)
return
else // 每秒会续租一次, 所以就会收到一次应答
if locker.option.Debug
log.Printf("进程 %+v 收到自动续租应答 %+v", os.Getpid(), keepResp.ID)
()
return locker, nil
func (locker *EtcdLocker) timeoutCancel()
if locker.option.Debug
log.Printf("进程 %+v 的锁操作撤销", os.Getpid())
// GetId 获得当前锁的内部ID
func (locker *EtcdLocker) GetId() int64
return int64(locker.leaseId)
// Acquire 获得锁
// lockerId 锁ID,推荐使用UUID或雪花算法,确保唯一性,防止复杂业务+大量数据的情况下发生锁冲撞
// 返回值:who 如果获得锁失败,此ID可以标示锁现在在谁手中(这个谁,来自于GetId()的返回值
// 换句话说,A进程获得锁之后,可以通过GetId知道自己的ID是多少,此时B进程获得锁失败,可以通过who返回值知道锁在A手中
func (locker *EtcdLocker) Acquire(lockerId string) (who int64, ok bool)
var err error
// 在租约时间内去抢锁(etcd 里面的锁就是一个 key)
kv := clientV3.NewKV(locker.client)
// 创建事务
txn := kv.Txn(context.TODO())
// 定义锁的Key
var lockerKey = locker.option.Prefix + lockerId
// If 不存在 key,Then 设置它,Else 抢锁失败
txn.If(clientV3.Compare(clientV3.CreateRevision(lockerKey), "=", 0)).
Then(clientV3.OpPut(lockerKey, lockerId, clientV3.WithLease(locker.leaseId))).
Else(clientV3.OpGet(lockerKey))
var txnResp *clientV3.TxnResponse
if txnResp, err = txn.Commit(); err != nil
return 0, false
if !txnResp.Succeeded
return txnResp.Responses[0].GetResponseRange().Kvs[0].Lease, false
return 0, true
// Release 释放锁
func (locker *EtcdLocker) Release() error
locker.cancelFunc()
if _, err := locker.lease.Revoke(context.TODO(), locker.leaseId); err != nil
return err
return nil
EtcdLocker Test方法
package etcd
import (
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"testing"
"time"
)
var etcdEndpoint = []string"192.168.1.111:2379"
// 一把锁,开调试
func TestEtcdLockerOneAsDebug(t *testing.T)
option := Option
ConnectionTimeout: 5 * time.Second,
Prefix: "",
Debug: true,
if locker, err := New(etcdEndpoint, option); err != nil
log.Fatalf("创建锁失败:%+v", err)
else if who, ok := locker.Acquire("EtcdLockerOneAsDebug"); ok
// 抢到锁后执行业务逻辑,没有抢到则退出
t.Logf("进程 %+v 持有锁 %+v 正在处理任务中...", os.Getpid(), locker.GetId())
time.Sleep(5 * time.Second) // 这是正在做的事情,假定耗时5秒
t.Logf("进程 %+v 的任务处理完了", os.Getpid())
// 手动释放锁,在后台应用服务中,也可以通过defer释放
if err := locker.Release(); err != nil
log.Fatalf("释放锁失败:%+v", err)
else
time.Sleep(2 * time.Second)
else
t.Logf("获取锁失败,锁现在在 %+v 手中", who)
// 一把锁,不开调试带前缀
func TestEtcdLockerOneNoneDebugAndPrefix(t *testing.T)
option := Option
ConnectionTimeout: 3 * time.Second,
Prefix: "MyEtcdLocker",
Debug: false,
if locker, err := New(etcdEndpoint, option); err != nil
log.Fatalf("创建锁失败:%+v", err)
else if who, ok := locker.Acquire("EtcdLockerOneNoneDebugAndPrefix"); ok
// 抢到锁后执行业务逻辑,没有抢到则退出
t.Logf("进程 %+v 持有锁 %+v 正在处理任务中...", os.Getpid(), locker.GetId())
time.Sleep(5 * time.Second) // 这是正在做的事情,假定耗时5秒
t.Logf("进程 %+v 的任务处理完了", os.Getpid())
// 手动释放锁,在后台应用服务中,也可以通过defer释放
if err := locker.Release(); err != nil
log.Fatalf("释放锁失败:%+v", err)
else
time.Sleep(1 * time.Second)
else
t.Logf("获取锁失败,锁现在在 %+v 手中", who)
// 一把锁,多任务(多请求)竞争锁,
// 此测试用例还可以通过命令 go test -run="TestEtcdLockerMultiTask" 开多个进程进行并行竞争测试
// 多进程测试时的结果验证方法,条件:多个测试只要有一个未完成,预期结果是:获取锁失败,successCount的值就是0
func TestEtcdLockerMultiTask(t *testing.T)
const taskCount = 5
option := Option
ConnectionTimeout: 3 * time.Second,
Prefix: "MyEtcdLocker",
Debug: false,
var successCount int64 = 0
var wg sync.WaitGroup
for i := 0; i < taskCount; i++
wg.Add(1)
go func(taskId int)
defer wg.Done()
if locker, err := New(etcdEndpoint, option); err != nil
log.Fatalf("[%+v]创建锁失败:%+v", taskId, err)
else if who, ok := locker.Acquire("EtcdLockerMulti"); ok
// 抢到锁后执行业务逻辑,没有抢到则退出
t.Logf("[%+v]进程 %+v 持有锁 %+v 正在处理任务中...", taskId, os.Getpid(), locker.GetId())
atomic.AddInt64(&successCount, 1)
time.Sleep(5 * time.Second) // 这是正在做的事情,假定耗时5秒
t.Logf("[%+v]进程 %+v 的任务处理完了", taskId, os.Getpid())
// 手动释放锁,在后台应用服务中,也可以通过defer释放
if err := locker.Release(); err != nil
log.Fatalf("[%+v]释放锁失败:%+v", taskId, err)
else
time.Sleep(1 * time.Second)
else
t.Logf("[%+v]获取锁失败,锁现在在 %+v 手中", taskId, who)
(i)
wg.Wait()
if successCount != 1
t.Fatalf("进程 %+v 的分布式锁功能存在BUG", os.Getpid())
// 多把锁,多任务(多请求),各有各的锁
func TestEtcdLockerMultiBusinessMultiLocker(t *testing.T)
const taskCount = 5
option := Option
ConnectionTimeout: 3 * time.Second,
Prefix: "MyEtcdLocker",
Debug: false,
var successCount int64 = 0
var wg sync.WaitGroup
for i := 0; i < taskCount; i++
wg.Add(1)
go func(taskId int)
defer wg.Done()
if locker, err := New(etcdEndpoint, option); err != nil
log.Fatalf("[%+v]创建锁失败:%+v", taskId, err)
else if who, ok := locker.Acquire(fmt.Sprintf("EtcdLockerMulti_%d", taskId)); ok
// 抢到锁后执行业务逻辑,没有抢到则退出
t.Logf("[%+v]进程 %+v 持有锁 %+v 正在处理任务中...", taskId, os.Getpid(), locker.GetId())
atomic.AddInt64(&successCount, 1)
time.Sleep(8 * time.Second) // 这是正在做的事情,假定耗时8秒
t.Logf("[%+v]进程 %+v 的任务处理完了", taskId, os.Getpid())
// 手动释放锁,在后台应用服务中,也可以通过defer释放
if err := locker.Release(); err != nil
log.Fatalf("[%+v]释放锁失败:%+v", taskId, err)
else
time.Sleep(1 * time.Second)
else
t.Logf("[%+v]获取锁失败,锁现在在 %+v 手中", taskId, who)
(i)
wg.Wait()
if successCount != taskCount
t.Fatalf("进程 %+v 的分布式锁功能存在BUG", os.Getpid())
func TestEtcdLocker_GetId(t *testing.T)
option := Option
ConnectionTimeout: 3 * time.Second,
Prefix: "EtcdLocker_GetId",
Debug: true,
if locker, err := New(etcdEndpoint, option); err != nil
log.Fatalf("创建锁失败:%+v", err)
else if who, ok := locker.Acquire("EtcdLocker_GetId"); ok
// 抢到锁后执行业务逻辑,没有抢到则退出
t.Logf("进程 %+v 持有锁 %+v 正在处理任务中...", os.Getpid(), locker.GetId())
time.Sleep(2 * time.Second) // 这是正在做的事情,假定耗时2秒
t.Logf("进程 %+v 的任务处理完了", os.Getpid())
// 手动释放锁,在后台应用服务中,也可以通过defer释放
if err := locker.Release(); err != nil
log.Fatalf("释放锁失败:%+v", err)
else
time.Sleep(1 * time.Second)
else
t.Logf("获取锁失败,锁现在在 %+v 手中", who)
参考文档
Etcd官网
Etcd
技术文章摘抄
基于 etcd 实现分布式锁
技术分享|分布式系统中服务注册发现组件的原理及比较
背景在分布式架构的系统中,服务发现简单来讲就是通过服务名找到提供服务的实例地址和端口,主要用于解决如何获取服务实例地址问题。随着容器技术的兴起,服务集群部署在系统各处,服务之间的远程调用... 查看详情
技术分享|etcd如何实现分布式负载均衡及分布式通知与协调
消息发布与订阅在分布式系统中,最适用的一种组件间通信方式就是消息发布与订阅。即构建一个配置共享中心,数据提供者在这个配置中心发布消息,而消息使用者则订阅他们关心的主题,一旦主题有消息发布,就会实时通知... 查看详情
技术分享|etcd如何实现分布式负载均衡及分布式通知与协调
Etcd是一个高度一致的分布式键值存储,它提供了一种可靠的方式来存储需要由分布式系统或机器集群访问的数据。Etcd比较多的应用场景是用于服务注册与发现(前面文章已经介绍过),除此之外,也可用于键... 查看详情
基于etcd实现分布式锁&实战:控制多个应用仅一台执行任务(代码片段)
我们知道,分布式锁有好几种方案:基于Redis、基于数据库如MySQL、基于注册中心如Zookeeper等;而K8S体系中基于Go语言编写的的ETCD则对于分布式锁有着更强大的支持。 ETCD有一个租约机制,客户端跟ETCD服务... 查看详情
基于etcd实现分布式锁&实战:控制多个应用仅一台执行任务(代码片段)
我们知道,分布式锁有好几种方案:基于Redis、基于数据库如MySQL、基于注册中心如Zookeeper等;而K8S体系中基于Go语言编写的的ETCD则对于分布式锁有着更强大的支持。 ETCD有一个租约机制,客户端跟ETCD服务... 查看详情
zookeeper--zookeeper分布式锁原理
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区... 查看详情
分布式锁原理及实现方式(代码片段)
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availabi... 查看详情
锁的原理和使用场景,乐观锁悲观锁公平锁非公平锁,基于数据库rediszookeeper实现分布式锁的原理及代码实现(代码片段)
一、锁1.1什么是锁?在JAVA中是一个非常重要的概念,尤其是在当今的互联网时代,高并发的场景下,更是离不开锁。那么锁到底是什么呢?在计算机科学中,锁(lock)或互斥(mutex)是一种同步机制,用于... 查看详情
分布式锁机制原理及实现方式(代码片段)
前言分布式锁,是控制分布式系统之间同步访问共享资源的一种方式在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥... 查看详情
基于redis实现分布式锁,分析解决锁误删情况及利用lua脚本解决原子性问题并改造锁(代码片段)
(目录)上一篇博文部分:优惠卷秒杀分布式锁1、基本原理和实现方式对比分布式锁:分布式锁的核心思想就是:分布式锁他应该满足一些什么样的条件呢?常见的分布式锁有三种:2、Redis分布式锁的实现核心思路实现分布式锁时... 查看详情
分布式锁-常用技术方案
分布式锁的解决方式1、是否可以考虑采用ReentrantLock来实现,但是实际上去实现的时候是有问题的,ReentrantLock的lock和unlock要求必须是在同一线程进行,而分布式应用中,lock和unlock是两次不相关的请求,因此肯定不是同一线程,... 查看详情
一文让你读懂分布式锁的使用原理及实现方式(代码片段)
一、为什么要使用分布式锁分布式环境下修改某个共有的数据,比如redis的共有数据;在同一时间,可能多个节点都先查询这个数据,然后更新。在查询的时候,结果是一样的,但是各个节点更新的时候,就是以最后一个更新为... 查看详情
分布式锁的实现方式及原理
...理一下业务,这是普通的事务是满足不了业务需求,需要分布式锁**分布式锁的常用3种实现:* 0.数据库乐观锁实现* 1. 查看详情
分布式服务框架之远程通讯技术及原理分析
在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,这些名词之间到底是些什么关系呢,它们背后到底是基于什... 查看详情
java远程通讯技术及原理分析
在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,这些名词之间到底是些什么关系呢,它们背后到底是基于什... 查看详情
分布式锁的几种实现方式(代码片段)
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区... 查看详情
分布式锁的三种实现方式及其比较
1实现方式分布式锁的实现,目前比较常用的有以下几种方案:基于Zookeeper实现实现分布式锁基于缓存(如redis等)分布式锁基于数据库实现分布式锁2基于Zookeeper实现实现分布式锁实现原理是:每个客户端对某个方法加锁时,在zo... 查看详情
zookeeper原理详解及常用操作(代码片段)
ZooKeeper是什么?ZooKeeper是一个开源的分布式应用程序协调系统。简称ZK,ZK是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于它实现数据的发布/订阅、负载均衡、名称服务、分布式协调/通知、集群管理、Master选... 查看详情