zookeeper--基于watcher原理实现带注册中心的rpc框架(代码片段)

flgb flgb     2023-03-09     229

关键词:

一、带版本控制的注册中心RPC框架

  server端

  

//注册中心接口
public interface IRegisterCenter 
	
	public void register(String serviceName,String serviceAddress);

 

//实现类
package zoorpc.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class RegisterCenter implements IRegisterCenter 
	
	private CuratorFramework curatorFramework;
	
	
	public RegisterCenter() 
		curatorFramework = CuratorFrameworkFactory.builder().connectString(ZooConfig.CONNECTION_STR)
				.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		curatorFramework.start();
	
	@Override
	public void register(String serviceName, String serviceAddress) 
		
		// 注册相应服务
		String Servicepath = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
		try 
			
			//判断服务/registrys/product-service/是否存在,否则创建
			if (curatorFramework.checkExists().forPath(Servicepath) == null) 
				curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
				.forPath(Servicepath,"0".getBytes());
			
			//创建服务iP节点
			String adressPath = Servicepath+"/"+serviceAddress;
			String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
			.forPath(adressPath,"0".getBytes());
			System.out.println("服务节点创建成功:"+rsNode);
		 catch (Exception e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		
	


  

//常量类
package zoorpc.zk;

public class ZooConfig 
	
	final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181";
	final static String ZK_REGISTER_PATH = "/registrys";

  

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation 
	
	/**
	 * 对外发布的接口地址
	 * @return
	 */
	Class<?> value();
	
	//多版本功能扩展
	String version() default "";

  

//服务接口
public interface IHelloWorld 
	
	public String sayHello(String msg);

  

//服务接口实现类1,不带版本控制
package zoorpc;

import anno.RpcAnnotation;

@RpcAnnotation(IHelloWorld.class)
public class HelloWorldServiceImpl implements IHelloWorld 

	@Override
	public String sayHello(String msg) 
		// TODO Auto-generated method stub
		return "HelloWorld,8080"+msg;
	


  

//服务接口实现类2,带版本控制
import anno.RpcAnnotation;

@RpcAnnotation(value = IHelloWorld.class,version = "2.0")
public class HelloWorldServiceImpl2 implements IHelloWorld 

    @Override
    public String sayHello(String msg) 
        // TODO Auto-generated method stub
        return "HelloWorld2,8081"+msg;
    

//服务发布类
package zoorpc;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import anno.RpcAnnotation;
import zoorpc.zk.IRegisterCenter;

public class RpcServer 
	
	private static final ExecutorService  executorService = Executors.newCachedThreadPool();
	
	private IRegisterCenter registerCenter;//注册中心
	private String serviceAddress;//服务发布地址
	//存放服务名称和服务对象之间的关系
	Map<String,Object> handlerMap = new HashMap<String,Object>();
	
	public RpcServer(IRegisterCenter registerCenter, String serviceAddress) 
		this.registerCenter = registerCenter;
		this.serviceAddress = serviceAddress;
	
	//绑定服务名称和服务对象
	public void bind(Object...services)
		for(Object service :services )
			RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class);
			String serviceName = rpcAnnotation.value().getName();
			//添加版本号控制
			String version = rpcAnnotation.version();
			if(version!=null && !version.equals(""))
				serviceName = serviceName+"-"+version;
			
			//添加版本号控制
			handlerMap.put(serviceName, service);//绑定接口服务名称及对应的服务
		
	
	//发布服务
	public void publisher()
		ServerSocket serverSocket = null;
		try 
			String[] split = serviceAddress.split(":");
			serverSocket = new ServerSocket(Integer.parseInt(split[1]));//启动一个服务监听
			for(String interfaceName : handlerMap.keySet())
				registerCenter.register(interfaceName, serviceAddress);
				System.out.println("服务注册成功:"+interfaceName+"->"+serviceAddress);
			
			while(true)
				Socket socket = serverSocket.accept();
				executorService.execute(new ProcessorHandler(socket,handlerMap));
			
		 catch (Exception e) 
			e.printStackTrace();
		finally 
			if(serverSocket!=null)
				try 
					serverSocket.close();
					
				 catch (IOException e) 
					// TODO Auto-generated catch block
					e.printStackTrace();
				
			
		
		
	

  

package zoorpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Map;

public class ProcessorHandler implements Runnable 
	
	private Socket socket;
	private Map<String,Object> handlerMap;
	
	
	public ProcessorHandler(Socket socket, Map<String,Object> handlerMap) 
		this.socket = socket;
		this.handlerMap = handlerMap;
	


	@Override
	public void run() 
		// TODO 处理请求
		ObjectInputStream objectInputStream =null;
		ObjectOutputStream objectOutputStream =null;
		try 
			objectInputStream = new ObjectInputStream(socket.getInputStream());
			RpcRequest request = (RpcRequest) objectInputStream.readObject();
			Object result = invoke(request);
			objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
			objectOutputStream.writeObject(result);
			objectOutputStream.flush();
		 catch (Exception e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		finally
			if(objectInputStream!=null)
				try 
					objectInputStream.close();
					objectOutputStream.close();
				 catch (Exception e) 
					// TODO Auto-generated catch block
					e.printStackTrace();
				
			
		
	
	
	private Object invoke(RpcRequest request) throws Exception, IllegalArgumentException, InvocationTargetException
		Object[] args = request.getParameters();
		Class<?> [] types = new Class[args.length];
		for (int i = 0; i < types.length; i++) 
			types[i] = args[i].getClass();
		
		//添加版本号控制
		String version = request.getVersion();
		String serviceName =request.getClassName();
		if(version!=null && !version.equals(""))
			serviceName =request.getClassName()+"-"+version;
		
		//添加版本号控制
		//从handlerMap中,根据客户端额请求地址,去拿到响应的服务,通过反射发起调用
		//Object service = handlerMap.get(request.getClassName());
		Object service = handlerMap.get(serviceName);//添加版本号控制
		Method method = service.getClass().getMethod(request.getMethodName(), types);
		return method.invoke(service, args);
	

  

//传输类
package zoorpc;

import java.io.Serializable;
/**
 * 传输对象
 * @author admin
 *
 */
public class RpcRequest implements Serializable
	
	private static final long serialVersionUID = 6351477854838485391L;
	private String className;
	private String methodName;
	private Object[] parameters;
	private String version;
	
	public String getVersion() 
		return version;
	
	
	public RpcRequest(String className, String methodName, Object[] parameters, String version) 
		super();
		this.className = className;
		this.methodName = methodName;
		this.parameters = parameters;
		this.version = version;
	

	public void setVersion(String version) 
		this.version = version;
	

	public RpcRequest(String className, String methodName, Object[] parameters) 
		super();
		this.className = className;
		this.methodName = methodName;
		this.parameters = parameters;
	
	
	public RpcRequest() 
		super();
		// TODO Auto-generated constructor stub
	

	public String getClassName() 
		return className;
	
	public void setClassName(String className) 
		this.className = className;
	
	public String getMethodName() 
		return methodName;
	
	public void setMethodName(String methodName) 
		this.methodName = methodName;
	
	public Object[] getParameters() 
		return parameters;
	
	public void setParameters(Object[] parameters) 
		this.parameters = parameters;
	
	

  

//发布服务
package zoorpc;

import java.io.IOException;

import zoorpc.zk.IRegisterCenter;
import zoorpc.zk.RegisterCenter;

public class ServerDemo 
	
	public static void main(String[] args) throws IOException 
		IHelloWorld service = new HelloWorldServiceImpl();
		IHelloWorld service2 = new HelloWorldServiceImpl2();
		IRegisterCenter registerCenter = new RegisterCenter();
		RpcServer server  = new RpcServer(registerCenter,"127.0.0.1:8080");
		server.bind(service,service2);
		server.publisher();
		System.in.read();
	

  客户端

package zoorpc.zk;

public interface IDiscovery 
    
    /**
     * 根据请求的服务地址,获取到服务的调用地址
     * @param serviceName
     * @return
     */
    public String Discovery(String serviceName);
package zoorpc.zk;

import java.util.ArrayList;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import zoorpc.loadbalance.ILoadBalance;
import zoorpc.loadbalance.RandomLoadBalance;

public class Discovery implements IDiscovery 
	
private CuratorFramework curatorFramework;
	
	List<String> repos = new ArrayList<>();
	private String adresses;
	public Discovery(String adresses) 
		this.adresses = adresses;
		curatorFramework = CuratorFrameworkFactory.builder().connectString(adresses)
				.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		curatorFramework.start();
	
	@Override
	public String Discovery(String serviceName) 
		String path = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
		ILoadBalance randomLoadBalance = null;
		try 
			repos = curatorFramework.getChildren().forPath(path);
			//动态发现节点的变化
			registerWatcher(path);
			//发现多个服务,做负载均衡
			randomLoadBalance = new RandomLoadBalance();
			
		 catch (Exception e) 
			e.printStackTrace();
		
	
		return randomLoadBalance.selectHost(repos);//返回调用的服务地址
	

	private void registerWatcher(final String path) throws Exception
		PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
		PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() 
			
			@Override
			public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception 
				repos = curatorFramework.getChildren().forPath(path);
			
		;
		pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
		pathChildrenCache.start();
	

  

package zoorpc.zk;

public class ZooConfig 
    
    public final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181";
    public final static String ZK_REGISTER_PATH = "/registrys";
package zoorpc;

public interface IHelloWorld 
	
	public String sayHello(String msg);

  

package zoorpc;

import java.io.Serializable;
/**
 * 传输对象
 * @author admin
 *
 */
public class RpcRequest implements Serializable
    
    private static final long serialVersionUID = 6351477854838485391L;
    private String className;
    private String methodName;
    private Object[] parameters;
    private String version;
    
    public String getVersion() 
        return version;
    

    public void setVersion(String version) 
        this.version = version;
    

    public RpcRequest(String className, String methodName, Object[] parameters) 
        super();
        this.className = className;
        this.methodName = methodName;
        this.parameters = parameters;
    
    
    public RpcRequest() 
        super();
        // TODO Auto-generated constructor stub
    

    public String getClassName() 
        return className;
    
    public void setClassName(String className) 
        this.className = className;
    
    public String getMethodName() 
        return methodName;
    
    public void setMethodName(String methodName) 
        this.methodName = methodName;
    
    public Object[] getParameters() 
        return parameters;
    
    public void setParameters(Object[] parameters) 
        this.parameters = parameters;
    
    
package zoorpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import zoorpc.zk.IDiscovery;

public class RpcClientProxy 
	
	private IDiscovery discovery;
	
	public RpcClientProxy(IDiscovery discovery) 
		this.discovery = discovery;
	

	public <T> T clientProxy(final Class<T> interfaceCls,String version)
		
		return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
				new Class[] interfaceCls,
                 new RemoteInvocationHandler(version,discovery));
		
	 

  

package zoorpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.Socket;

import zoorpc.zk.IDiscovery;



public class RemoteInvocationHandler implements InvocationHandler 
	
	private String version;//添加版本号控制
	private IDiscovery discovery;
	
	public RemoteInvocationHandler(String version,IDiscovery discovery) 
		this.discovery = discovery;
		this.version = version;
	

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 
		// TODO Auto-generated method stub
		RpcRequest request = new RpcRequest();
		request.setClassName(method.getDeclaringClass().getName());
		request.setMethodName(method.getName());
		request.setParameters(args);
		request.setVersion(version);
		String serviceAddress = discovery.Discovery(request.getClassName());
		TcpTransport trans = new TcpTransport(serviceAddress);
		return trans.send(request);
	


  

package zoorpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;


public class TcpTransport 
	
	private String serviceAddress;
	
	public TcpTransport(String serviceAddress) 
		super();
		this.serviceAddress = serviceAddress;
	
	
	Socket newSocket()
		System.out.println("创建一个连接");
		Socket socket = null;
		try 
			String[] split = serviceAddress.split(":");
			socket = new Socket(split[0],Integer.parseInt(split[1]));
			return socket;
		 catch (Exception e) 
			// TODO: handle exception
			throw new RuntimeException("连接建立失败!");
		
	
	
	public Object send(RpcRequest request)
		Socket socket = null;
		ObjectOutputStream objectOutputStream = null;
		ObjectInputStream objectInputStream = null;
		try 
			socket = newSocket();
			
			objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
			objectOutputStream.writeObject(request);
			objectOutputStream.flush();
			
			objectInputStream = new ObjectInputStream(socket.getInputStream());
			Object readObject = objectInputStream.readObject();
			return readObject;
		 catch (Exception e) 
			// TODO: handle exception
			e.printStackTrace();
			throw new RuntimeException("连接建立失败!");
		finally 
			if(socket!=null)
				try 
					socket.close();
					objectOutputStream.close();
					objectInputStream.close();
				 catch (IOException e) 
					// TODO Auto-generated catch block
					e.printStackTrace();
				
			
		
	


  

package zoorpc.loadbalance;

import java.util.List;

public interface ILoadBalance 
	
	public String selectHost(List<String> repos);
		

  

package zoorpc.loadbalance;

import java.util.List;

public abstract class LoadBalance implements ILoadBalance

    @Override
    public String selectHost(List<String> repos) 
        if(repos.size()<1)
            return null;
        else if(repos.size() ==1)
            return repos.get(0);
        else
            return doSelect(repos);
        
    
    
    protected abstract String doSelect(List<String> repos);
package zoorpc.loadbalance;

import java.util.List;
import java.util.Random;

public class RandomLoadBalance extends LoadBalance 

    @Override
    protected String doSelect(List<String> repos) 
        int len = repos.size();
        Random random = new Random();
        return repos.get(random.nextInt(len));
        
    

package zoorpc;

import zoorpc.zk.Discovery;
import zoorpc.zk.IDiscovery;
import zoorpc.zk.ZooConfig;

public class ClientDemo 
	public static void main(String[] args) 
		IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR);
		RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
         //IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"");结果:HelloWorld,8080lf IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"2.0");结果:HelloWorld2,8081lf System.out.println(hello.sayHello("lf"));

  二、模拟集群

   新增发布类:

  

package zoorpc;

import java.io.IOException;

import zoorpc.zk.IRegisterCenter;
import zoorpc.zk.RegisterCenter;

public class LBServerDemo1 
    //模拟集群
    public static void main(String[] args) throws IOException 
        IHelloWorld service = new HelloWorldServiceImpl();
        IRegisterCenter registerCenter = new RegisterCenter();
        RpcServer server  = new RpcServer(registerCenter,"127.0.0.1:8080");
        server.bind(service);
        server.publisher();
        System.in.read();
    
package zoorpc;

import java.io.IOException;

import zoorpc.zk.IRegisterCenter;
import zoorpc.zk.RegisterCenter;

public class LBServerDemo2 
    //模拟集群
    public static void main(String[] args) throws IOException 
        IHelloWorld service = new HelloWorldServiceImpl2();
        IRegisterCenter registerCenter = new RegisterCenter();
        RpcServer server  = new RpcServer(registerCenter,"127.0.0.1:8081");
        server.bind(service);
        server.publisher();
        System.in.read();
    

修改示例2类的注解

package zoorpc;

import anno.RpcAnnotation;

//@RpcAnnotation(value = IHelloWorld.class,version = "2.0")
@RpcAnnotation(value = IHelloWorld.class)
public class HelloWorldServiceImpl2 implements IHelloWorld 

	@Override
	public String sayHello(String msg) 
		// TODO Auto-generated method stub
		return "HelloWorld2,8081"+msg;
	


  运行发布类1,2

  linux 下查看节点显示:

[zk: localhost:2181(CONNECTED) 13] ls /registrys/zoorpc.IHelloWorld
[127.0.0.1:8081, 127.0.0.1:8080]
[zk: localhost:2181(CONNECTED) 14]

 客户端

package zoorpc;

import zoorpc.zk.Discovery;
import zoorpc.zk.IDiscovery;
import zoorpc.zk.ZooConfig;

public class LBClientDemo 
    public static void main(String[] args) throws InterruptedException 
        IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR);
        RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
        for (int i = 0; i < 10; i++) 
            IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,null);
            System.out.println(hello.sayHello("lf"));
            Thread.sleep(1000);
        
    

运行结果:

创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld2,8081lf

实现原理图:

技术图片

 四、集群扩容

  一、停机扩容,修改配置

  二、逐台扩容,一台台重启

基于zookeeper实现多进程分布式锁

一、zookeeper简介及基本操作Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。当对目录节点监控状态打开时,一旦目录节点的状态发生变化,Watcher 对象的 process 方法... 查看详情

zookeeper介绍

...定义相关概念使用场景与规格价值实现原理本文主要介绍ZooKeeper的定义,Leader、Follower和Observer三种角色概念,znode模型和Watcher机制,并介绍了8中典型的ZooKeeper使用场景,最后介绍了其实现原理。定义ZooKeeper是一... 查看详情

zookeeper介绍

...定义相关概念使用场景与规格价值实现原理本文主要介绍ZooKeeper的定义,Leader、Follower和Observer三种角色概念,znode模型和Watcher机制,并介绍了8中典型的ZooKeeper使用场景,最后介绍了其实现原理。定义ZooKeeper是一... 查看详情

zookeeper监控的原理和使用(代码片段)

1、Watcher机制:  Zookeeper允许客户端向服务端的某个Znode注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据Watcher... 查看详情

zookeeperwatcher机制(代码片段)

前言在ZooKeeper中,客户端可以向服务端注册一个监听器,监听某个节点或者其子节点列表,当监听对象发生变化时,服务端就会向指定的客户端发送通知,这是ZooKeeper中的Watcher机制,Watcher机制是ZooKeeper中一个重要的特性,这篇... 查看详情

zookeeper监控的原理和使用(代码片段)

1、Watcher机制:  Zookeeper允许客户端向服务端的某个Znode注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据Watcher... 查看详情

zookeeper-watcher机制源码分析

Watcher的基本流程ZooKeeper的Watcher机制,总的来说可以分为三个过程:客户端注册Watcher、服务器处理Watcher和客户端回调Watcher客户端注册watcher有3种方式,getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理ZooKeeperzo... 查看详情

zookeeper-客户端注册watcher实现

(1)调用getData()/getChildren()/exist()三个API,传入Watcher对象(2)标记请求request,封装Watcher到WatchRegistration(3)封装成Packet对象,发服务端发送request(4ÿ 查看详情

zookeeper-客户端注册watcher实现

(1)调用getData()/getChildren()/exist()三个API,传入Watcher对象(2)标记请求request,封装Watcher到WatchRegistration(3)封装成Packet对象,发服务端发送request(4ÿ 查看详情

创建一个zookeeper的会话(实现watcher)

在先前的章节中,我们利用zkCli去了解了一下主要的zookeeper的操作。在接下来的章节中,我们将会学习一下在应用中是怎样利用zookeeper的api的。接下来我们将利用一个程序展示一下,怎样来创建一个回话和监视。那么以下我们将... 查看详情

zookeeper-客户端注册watcher实现

(1)调用getData()/getChildren()/exist()三个API,传入Watcher对象(2)标记请求request,封装Watcher到WatchRegistration(3)封装成Packet对象,发服务端发送request(4)收到服务端响应后,将Watcher注... 查看详情

基于zookeeper简单实现分布式锁

这里利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制。来简单实现分布式锁。主要思想:1、开启10个线程。在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点。2、获取disLocks节点下全部子节点,排序,假设自己的节点编号... 查看详情

利用zookeeper的发布/订阅模式实现配置动态变更

??ZooKeeper的Watcher事件机制可以说分布式场景下的观察者模式的实现。基于这个watcher事件机制,配合注册到特定的ZNode节点,可以实现java应用的配置运行时的变更。在学习zookeeper之前,听同事说配置可以在运行时动态变更,觉得... 查看详情

zookeeper源码分析之watcher机制

一、前言  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。二、总体框图  对于Watcher机制而言,主要涉及的类主要如下。    说明:  Watcher,接口类型,其定义了process方... 查看详情

watcher实现机制之client注冊

Zookeeper提供的了分布式数据的公布/订阅功能,通过Watch机制来实现这样的分布式的通知功能。Zookeeper同意client向server注冊一个Watch监听。当服务端的一些指定的事件触发了这个Watch。就会向指定的client发送一个事件通知来实现分... 查看详情

zookeeper学习-5javaapi操作-watcher监听机制(代码片段)

Watch事件监听概念ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性。ZooKeeper中引入了Watcher... 查看详情

springboot基于zookeeper原生方式实现分布式锁(代码片段)

...ven依赖三、配置3.1、application.yml配置3.2、属性配置类3.3、ZookeeperConfig配置件四、实战4.1、接口4.2、接口核心实现4.3、测试类4.4、结果4.5、关于CountDownLatch结语一、背景  我在之前的文章SpringBoot基于Zookeeper和Curator实现分布式锁... 查看详情

springboot基于zookeeper和curator实现分布式锁并分析其原理(代码片段)

...ven依赖三、配置类3.1、属性配置文件3.2、属性配置类3.3、ZookeeperConfig配置类(重要)3.4、ZookeeperClient配置类(重要)四、业务编写4.1、抽象类AbstractLock4.2、锁使实现(核心)4.3、controller层五、测试5.1、配... 查看详情