关键词:
一、带版本控制的注册中心RPC框架
server端
//注册中心接口 public interface IRegisterCenter public void register(String serviceName,String serviceAddress);
//实现类 package zoorpc.zk; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class RegisterCenter implements IRegisterCenter private CuratorFramework curatorFramework; public RegisterCenter() curatorFramework = CuratorFrameworkFactory.builder().connectString(ZooConfig.CONNECTION_STR) .connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); curatorFramework.start(); @Override public void register(String serviceName, String serviceAddress) // 注册相应服务 String Servicepath = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName; try //判断服务/registrys/product-service/是否存在,否则创建 if (curatorFramework.checkExists().forPath(Servicepath) == null) curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath(Servicepath,"0".getBytes()); //创建服务iP节点 String adressPath = Servicepath+"/"+serviceAddress; String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .forPath(adressPath,"0".getBytes()); System.out.println("服务节点创建成功:"+rsNode); catch (Exception e) // TODO Auto-generated catch block e.printStackTrace();
//常量类 package zoorpc.zk; public class ZooConfig final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181"; final static String ZK_REGISTER_PATH = "/registrys";
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface RpcAnnotation /** * 对外发布的接口地址 * @return */ Class<?> value(); //多版本功能扩展 String version() default "";
//服务接口 public interface IHelloWorld public String sayHello(String msg);
//服务接口实现类1,不带版本控制 package zoorpc; import anno.RpcAnnotation; @RpcAnnotation(IHelloWorld.class) public class HelloWorldServiceImpl implements IHelloWorld @Override public String sayHello(String msg) // TODO Auto-generated method stub return "HelloWorld,8080"+msg;
//服务接口实现类2,带版本控制 import anno.RpcAnnotation; @RpcAnnotation(value = IHelloWorld.class,version = "2.0") public class HelloWorldServiceImpl2 implements IHelloWorld @Override public String sayHello(String msg) // TODO Auto-generated method stub return "HelloWorld2,8081"+msg;
//服务发布类 package zoorpc; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import anno.RpcAnnotation; import zoorpc.zk.IRegisterCenter; public class RpcServer private static final ExecutorService executorService = Executors.newCachedThreadPool(); private IRegisterCenter registerCenter;//注册中心 private String serviceAddress;//服务发布地址 //存放服务名称和服务对象之间的关系 Map<String,Object> handlerMap = new HashMap<String,Object>(); public RpcServer(IRegisterCenter registerCenter, String serviceAddress) this.registerCenter = registerCenter; this.serviceAddress = serviceAddress; //绑定服务名称和服务对象 public void bind(Object...services) for(Object service :services ) RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class); String serviceName = rpcAnnotation.value().getName(); //添加版本号控制 String version = rpcAnnotation.version(); if(version!=null && !version.equals("")) serviceName = serviceName+"-"+version; //添加版本号控制 handlerMap.put(serviceName, service);//绑定接口服务名称及对应的服务 //发布服务 public void publisher() ServerSocket serverSocket = null; try String[] split = serviceAddress.split(":"); serverSocket = new ServerSocket(Integer.parseInt(split[1]));//启动一个服务监听 for(String interfaceName : handlerMap.keySet()) registerCenter.register(interfaceName, serviceAddress); System.out.println("服务注册成功:"+interfaceName+"->"+serviceAddress); while(true) Socket socket = serverSocket.accept(); executorService.execute(new ProcessorHandler(socket,handlerMap)); catch (Exception e) e.printStackTrace(); finally if(serverSocket!=null) try serverSocket.close(); catch (IOException e) // TODO Auto-generated catch block e.printStackTrace();
package zoorpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; import java.util.Map; public class ProcessorHandler implements Runnable private Socket socket; private Map<String,Object> handlerMap; public ProcessorHandler(Socket socket, Map<String,Object> handlerMap) this.socket = socket; this.handlerMap = handlerMap; @Override public void run() // TODO 处理请求 ObjectInputStream objectInputStream =null; ObjectOutputStream objectOutputStream =null; try objectInputStream = new ObjectInputStream(socket.getInputStream()); RpcRequest request = (RpcRequest) objectInputStream.readObject(); Object result = invoke(request); objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(result); objectOutputStream.flush(); catch (Exception e) // TODO Auto-generated catch block e.printStackTrace(); finally if(objectInputStream!=null) try objectInputStream.close(); objectOutputStream.close(); catch (Exception e) // TODO Auto-generated catch block e.printStackTrace(); private Object invoke(RpcRequest request) throws Exception, IllegalArgumentException, InvocationTargetException Object[] args = request.getParameters(); Class<?> [] types = new Class[args.length]; for (int i = 0; i < types.length; i++) types[i] = args[i].getClass(); //添加版本号控制 String version = request.getVersion(); String serviceName =request.getClassName(); if(version!=null && !version.equals("")) serviceName =request.getClassName()+"-"+version; //添加版本号控制 //从handlerMap中,根据客户端额请求地址,去拿到响应的服务,通过反射发起调用 //Object service = handlerMap.get(request.getClassName()); Object service = handlerMap.get(serviceName);//添加版本号控制 Method method = service.getClass().getMethod(request.getMethodName(), types); return method.invoke(service, args);
//传输类 package zoorpc; import java.io.Serializable; /** * 传输对象 * @author admin * */ public class RpcRequest implements Serializable private static final long serialVersionUID = 6351477854838485391L; private String className; private String methodName; private Object[] parameters; private String version; public String getVersion() return version; public RpcRequest(String className, String methodName, Object[] parameters, String version) super(); this.className = className; this.methodName = methodName; this.parameters = parameters; this.version = version; public void setVersion(String version) this.version = version; public RpcRequest(String className, String methodName, Object[] parameters) super(); this.className = className; this.methodName = methodName; this.parameters = parameters; public RpcRequest() super(); // TODO Auto-generated constructor stub public String getClassName() return className; public void setClassName(String className) this.className = className; public String getMethodName() return methodName; public void setMethodName(String methodName) this.methodName = methodName; public Object[] getParameters() return parameters; public void setParameters(Object[] parameters) this.parameters = parameters;
//发布服务 package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class ServerDemo public static void main(String[] args) throws IOException IHelloWorld service = new HelloWorldServiceImpl(); IHelloWorld service2 = new HelloWorldServiceImpl2(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8080"); server.bind(service,service2); server.publisher(); System.in.read();
客户端
package zoorpc.zk; public interface IDiscovery /** * 根据请求的服务地址,获取到服务的调用地址 * @param serviceName * @return */ public String Discovery(String serviceName);
package zoorpc.zk; import java.util.ArrayList; import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import zoorpc.loadbalance.ILoadBalance; import zoorpc.loadbalance.RandomLoadBalance; public class Discovery implements IDiscovery private CuratorFramework curatorFramework; List<String> repos = new ArrayList<>(); private String adresses; public Discovery(String adresses) this.adresses = adresses; curatorFramework = CuratorFrameworkFactory.builder().connectString(adresses) .connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); curatorFramework.start(); @Override public String Discovery(String serviceName) String path = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName; ILoadBalance randomLoadBalance = null; try repos = curatorFramework.getChildren().forPath(path); //动态发现节点的变化 registerWatcher(path); //发现多个服务,做负载均衡 randomLoadBalance = new RandomLoadBalance(); catch (Exception e) e.printStackTrace(); return randomLoadBalance.selectHost(repos);//返回调用的服务地址 private void registerWatcher(final String path) throws Exception PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true); PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception repos = curatorFramework.getChildren().forPath(path); ; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); pathChildrenCache.start();
package zoorpc.zk; public class ZooConfig public final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181"; public final static String ZK_REGISTER_PATH = "/registrys";
package zoorpc; public interface IHelloWorld public String sayHello(String msg);
package zoorpc; import java.io.Serializable; /** * 传输对象 * @author admin * */ public class RpcRequest implements Serializable private static final long serialVersionUID = 6351477854838485391L; private String className; private String methodName; private Object[] parameters; private String version; public String getVersion() return version; public void setVersion(String version) this.version = version; public RpcRequest(String className, String methodName, Object[] parameters) super(); this.className = className; this.methodName = methodName; this.parameters = parameters; public RpcRequest() super(); // TODO Auto-generated constructor stub public String getClassName() return className; public void setClassName(String className) this.className = className; public String getMethodName() return methodName; public void setMethodName(String methodName) this.methodName = methodName; public Object[] getParameters() return parameters; public void setParameters(Object[] parameters) this.parameters = parameters;
package zoorpc; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import zoorpc.zk.IDiscovery; public class RpcClientProxy private IDiscovery discovery; public RpcClientProxy(IDiscovery discovery) this.discovery = discovery; public <T> T clientProxy(final Class<T> interfaceCls,String version) return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class[] interfaceCls, new RemoteInvocationHandler(version,discovery));
package zoorpc; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.net.Socket; import zoorpc.zk.IDiscovery; public class RemoteInvocationHandler implements InvocationHandler private String version;//添加版本号控制 private IDiscovery discovery; public RemoteInvocationHandler(String version,IDiscovery discovery) this.discovery = discovery; this.version = version; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable // TODO Auto-generated method stub RpcRequest request = new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setVersion(version); String serviceAddress = discovery.Discovery(request.getClassName()); TcpTransport trans = new TcpTransport(serviceAddress); return trans.send(request);
package zoorpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; public class TcpTransport private String serviceAddress; public TcpTransport(String serviceAddress) super(); this.serviceAddress = serviceAddress; Socket newSocket() System.out.println("创建一个连接"); Socket socket = null; try String[] split = serviceAddress.split(":"); socket = new Socket(split[0],Integer.parseInt(split[1])); return socket; catch (Exception e) // TODO: handle exception throw new RuntimeException("连接建立失败!"); public Object send(RpcRequest request) Socket socket = null; ObjectOutputStream objectOutputStream = null; ObjectInputStream objectInputStream = null; try socket = newSocket(); objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(request); objectOutputStream.flush(); objectInputStream = new ObjectInputStream(socket.getInputStream()); Object readObject = objectInputStream.readObject(); return readObject; catch (Exception e) // TODO: handle exception e.printStackTrace(); throw new RuntimeException("连接建立失败!"); finally if(socket!=null) try socket.close(); objectOutputStream.close(); objectInputStream.close(); catch (IOException e) // TODO Auto-generated catch block e.printStackTrace();
package zoorpc.loadbalance; import java.util.List; public interface ILoadBalance public String selectHost(List<String> repos);
package zoorpc.loadbalance; import java.util.List; public abstract class LoadBalance implements ILoadBalance @Override public String selectHost(List<String> repos) if(repos.size()<1) return null; else if(repos.size() ==1) return repos.get(0); else return doSelect(repos); protected abstract String doSelect(List<String> repos);
package zoorpc.loadbalance; import java.util.List; import java.util.Random; public class RandomLoadBalance extends LoadBalance @Override protected String doSelect(List<String> repos) int len = repos.size(); Random random = new Random(); return repos.get(random.nextInt(len));
package zoorpc; import zoorpc.zk.Discovery; import zoorpc.zk.IDiscovery; import zoorpc.zk.ZooConfig; public class ClientDemo public static void main(String[] args) IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR); RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
//IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"");结果:HelloWorld,8080lf IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"2.0");结果:HelloWorld2,8081lf System.out.println(hello.sayHello("lf"));
二、模拟集群
新增发布类:
package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class LBServerDemo1 //模拟集群 public static void main(String[] args) throws IOException IHelloWorld service = new HelloWorldServiceImpl(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8080"); server.bind(service); server.publisher(); System.in.read();
package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter; import zoorpc.zk.RegisterCenter; public class LBServerDemo2 //模拟集群 public static void main(String[] args) throws IOException IHelloWorld service = new HelloWorldServiceImpl2(); IRegisterCenter registerCenter = new RegisterCenter(); RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8081"); server.bind(service); server.publisher(); System.in.read();
修改示例2类的注解
package zoorpc;
import anno.RpcAnnotation;
//@RpcAnnotation(value = IHelloWorld.class,version = "2.0")
@RpcAnnotation(value = IHelloWorld.class)
public class HelloWorldServiceImpl2 implements IHelloWorld
@Override
public String sayHello(String msg)
// TODO Auto-generated method stub
return "HelloWorld2,8081"+msg;
运行发布类1,2
linux 下查看节点显示:
[zk: localhost:2181(CONNECTED) 13] ls /registrys/zoorpc.IHelloWorld
[127.0.0.1:8081, 127.0.0.1:8080]
[zk: localhost:2181(CONNECTED) 14]
客户端
package zoorpc; import zoorpc.zk.Discovery; import zoorpc.zk.IDiscovery; import zoorpc.zk.ZooConfig; public class LBClientDemo public static void main(String[] args) throws InterruptedException IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR); RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery); for (int i = 0; i < 10; i++) IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,null); System.out.println(hello.sayHello("lf")); Thread.sleep(1000);
运行结果:
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld2,8081lf
实现原理图:
四、集群扩容
一、停机扩容,修改配置
二、逐台扩容,一台台重启
基于zookeeper实现多进程分布式锁
一、zookeeper简介及基本操作Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。当对目录节点监控状态打开时,一旦目录节点的状态发生变化,Watcher 对象的 process 方法... 查看详情
zookeeper介绍
...定义相关概念使用场景与规格价值实现原理本文主要介绍ZooKeeper的定义,Leader、Follower和Observer三种角色概念,znode模型和Watcher机制,并介绍了8中典型的ZooKeeper使用场景,最后介绍了其实现原理。定义ZooKeeper是一... 查看详情
zookeeper介绍
...定义相关概念使用场景与规格价值实现原理本文主要介绍ZooKeeper的定义,Leader、Follower和Observer三种角色概念,znode模型和Watcher机制,并介绍了8中典型的ZooKeeper使用场景,最后介绍了其实现原理。定义ZooKeeper是一... 查看详情
zookeeper监控的原理和使用(代码片段)
1、Watcher机制: Zookeeper允许客户端向服务端的某个Znode注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据Watcher... 查看详情
zookeeperwatcher机制(代码片段)
前言在ZooKeeper中,客户端可以向服务端注册一个监听器,监听某个节点或者其子节点列表,当监听对象发生变化时,服务端就会向指定的客户端发送通知,这是ZooKeeper中的Watcher机制,Watcher机制是ZooKeeper中一个重要的特性,这篇... 查看详情
zookeeper监控的原理和使用(代码片段)
1、Watcher机制: Zookeeper允许客户端向服务端的某个Znode注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据Watcher... 查看详情
zookeeper-watcher机制源码分析
Watcher的基本流程ZooKeeper的Watcher机制,总的来说可以分为三个过程:客户端注册Watcher、服务器处理Watcher和客户端回调Watcher客户端注册watcher有3种方式,getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理ZooKeeperzo... 查看详情
zookeeper-客户端注册watcher实现
(1)调用getData()/getChildren()/exist()三个API,传入Watcher对象(2)标记请求request,封装Watcher到WatchRegistration(3)封装成Packet对象,发服务端发送request(4ÿ 查看详情
zookeeper-客户端注册watcher实现
(1)调用getData()/getChildren()/exist()三个API,传入Watcher对象(2)标记请求request,封装Watcher到WatchRegistration(3)封装成Packet对象,发服务端发送request(4ÿ 查看详情
创建一个zookeeper的会话(实现watcher)
在先前的章节中,我们利用zkCli去了解了一下主要的zookeeper的操作。在接下来的章节中,我们将会学习一下在应用中是怎样利用zookeeper的api的。接下来我们将利用一个程序展示一下,怎样来创建一个回话和监视。那么以下我们将... 查看详情
zookeeper-客户端注册watcher实现
(1)调用getData()/getChildren()/exist()三个API,传入Watcher对象(2)标记请求request,封装Watcher到WatchRegistration(3)封装成Packet对象,发服务端发送request(4)收到服务端响应后,将Watcher注... 查看详情
基于zookeeper简单实现分布式锁
这里利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制。来简单实现分布式锁。主要思想:1、开启10个线程。在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点。2、获取disLocks节点下全部子节点,排序,假设自己的节点编号... 查看详情
利用zookeeper的发布/订阅模式实现配置动态变更
??ZooKeeper的Watcher事件机制可以说分布式场景下的观察者模式的实现。基于这个watcher事件机制,配合注册到特定的ZNode节点,可以实现java应用的配置运行时的变更。在学习zookeeper之前,听同事说配置可以在运行时动态变更,觉得... 查看详情
zookeeper源码分析之watcher机制
一、前言 前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。二、总体框图 对于Watcher机制而言,主要涉及的类主要如下。 说明: Watcher,接口类型,其定义了process方... 查看详情
watcher实现机制之client注冊
Zookeeper提供的了分布式数据的公布/订阅功能,通过Watch机制来实现这样的分布式的通知功能。Zookeeper同意client向server注冊一个Watch监听。当服务端的一些指定的事件触发了这个Watch。就会向指定的client发送一个事件通知来实现分... 查看详情
zookeeper学习-5javaapi操作-watcher监听机制(代码片段)
Watch事件监听概念ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性。ZooKeeper中引入了Watcher... 查看详情
springboot基于zookeeper原生方式实现分布式锁(代码片段)
...ven依赖三、配置3.1、application.yml配置3.2、属性配置类3.3、ZookeeperConfig配置件四、实战4.1、接口4.2、接口核心实现4.3、测试类4.4、结果4.5、关于CountDownLatch结语一、背景 我在之前的文章SpringBoot基于Zookeeper和Curator实现分布式锁... 查看详情
springboot基于zookeeper和curator实现分布式锁并分析其原理(代码片段)
...ven依赖三、配置类3.1、属性配置文件3.2、属性配置类3.3、ZookeeperConfig配置类(重要)3.4、ZookeeperClient配置类(重要)四、业务编写4.1、抽象类AbstractLock4.2、锁使实现(核心)4.3、controller层五、测试5.1、配... 查看详情