分布式理论,架构设计自定义rpc(代码片段)

拐柒 拐柒     2023-02-23     719

关键词:

分布式理论,架构设计(四)自定义RPC

自定义RPC

在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现 远程通讯的技术,例如:RMI、Hessian、SOAP、ESB和JMS等。
要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络 通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输 协议,网络IO,主要有bio、nio、aio三种方式,所有的分布式应用通讯都基于这个原理而实现。
RPC全称为remote procedure call,即远程过程调用。借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式。
RPC架构
一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及Server Stub,这个Stub可以理解为存根。
· 客户端(Client),服务的调用方
· 客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方
· 服务端(Server),真正的服务提供者
· 服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。
1.客户端(client)以本地调用方式(即以接口的方式)调用服务;
2.客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体
(将消息体对象序列化为二进制);
3.客户端通过socket将消息发送到服务端;
4.服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
5.服务端存根( server stub)根据解码结果调用本地的服务;
6.服务处理
7.本地服务执行并将结果返回给服务端存根( server stub);
8.服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
9.服务端(server)通过socket将消息发送到客户端;
10.客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
11.客户端(client)得到最终结果。
RPC的目标是要把2、3、4、5、7、8、9、10这些步骤都封装起来。只剩下1、6、11
注意: 无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要 将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。
在java中RPC框架比较多,常见的有Hessian、gRPC、Dubbo 等,其实对 于RPC框架而言,核心模块就是通讯和序列化。

RMI

Java RMI,即远程方法调用(Remote Method Invocation),一种用于实现远程过程调用(RPC- Remote procedure call)的Java API, 能直接传输序列化后的Java对象。它的实现依赖于Java虚拟机,因此它仅支持从一个JVM到另一个JVM的调用。

1.客户端从远程服务器的注册表中查询并获取远程对象引用。
2.桩对象与远程对象具有相同的接口和方法列表,当客户端调用远程对象时,实际上是由相应的桩 对象代理完成的。
3.远程引用层在将桩的本地引用转换为服务器上对象的远程引用后,再将调用传递给传输层(Transport),由传输层通过TCP协议发送调用;
4.在服务器端,传输层监听入站连接,它一旦接收到客户端远程调用后,就将这个引用转发给其上 层的远程引用层; 5)服务器端的远程引用层将客户端发送的远程应用转换为本地虚拟机的引用后,再将请求传递给骨架(Skeleton); 6)骨架读取参数,又将请求传递给服务器,最后由服务 器进行实际的方法调用。
5.如果远程方法调用后有返回值,则服务器将这些结果又沿着“骨架->远程引用层->传输层”向下传递;
6.客户端的传输层接收到返回值后,又沿着“传输层->远程引用层->桩”向上传递,然后由桩来反序列化这些返回值,并将最终的结果传递给客户端程序。
代码实现:
服务端

 try 
            //1、注册registry实例,绑定端口
            Registry registry = LocateRegistry.createRegistry(8088);
            //2、创建远程对象
            UserService userServe=new UserServiceImpl();
            //3、将远程对象注册到RMI服务器上即(服务端注册表上)
            registry.rebind("userService",userServe);
            System.out.println("RMI服务端启动成功");
         catch (RemoteException e) 
            e.printStackTrace();
        

客户端

 try 
            //1、获取registry实例
            Registry registry = LocateRegistry.getRegistry("localhost", 8088);
            //2、通过实例查找对应的远程对象
            UserService userService = (UserService) registry.lookup("userService");
            //3、
            User userById = userService.getUserById(2);
            System.out.println(userById);
         catch (RemoteException | NotBoundException e) 
            e.printStackTrace();
        

基于netty实现RPC框架

dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,
1.创建一个接口,定义抽象方法。用于消费者和提供者之间的约定,
2.创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据
3.创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 进行数据通信
4.提供者与消费者数据传输使用json字符串数据格式
5.提供者使用netty集成spring boot 环境实现

代码实现

服务端代码

注解RpcService

/**
 * 对外暴露服务接口
 */
@Target(ElementType.TYPE) //用于接口和类上
@Retention(RetentionPolicy.RUNTIME) //在运行时可以获取到
public @interface RpcService 



服务端处理类

@Component
@ChannelHandler.Sharable
public class RPCServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware 
    private static final Map SERVICE_INSTANCE_MAP= new ConcurrentHashMap();
    //1、将标有RPCService注解的bean缓存
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
        Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
        if(serviceMap!=null&&serviceMap.size()>0)
            Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();
            for (Map.Entry<String, Object> entry : entries) 
                Object serviceBean = entry.getValue();
                if(serviceBean.getClass().getInterfaces().length==0)
                    throw new RuntimeException("服务必须实现接口");
                
                //默认取第一个接口作为缓存bean的名称
                String name = serviceBean.getClass().getInterfaces()[0].getName();
                SERVICE_INSTANCE_MAP.put(name,serviceBean);
            
        
    

    /**
     * 通道读取就绪事件
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception 
        //1、接受客户端请求,把s转换为RpcRequest对象
        RpcRequest rpcRequest = JSON.parseObject(s, RpcRequest.class);
        RpcResponse response=new RpcResponse();
        response.setRequestId(rpcRequest.getRequestId());
        try 
            //业务处理
            response.setResult(handler(rpcRequest));
         catch (Exception e) 
            e.printStackTrace();
            response.setError(e.getMessage());
        
        System.out.println(JSON.toJSONString(response));
        //给客户端相应
        channelHandlerContext.writeAndFlush(JSON.toJSONString(response));
    

    /**
     * 业务处理逻辑
     * @return
     */
    public Object handler(RpcRequest rpcRequest) throws InvocationTargetException 
        //  3、根据传递过来的beanName从缓存中查找对应的bean
        Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
        if(serviceBean==null)
            throw new RuntimeException("根据beanName找不到对象,beanName:"+rpcRequest.getClassName());
        
        //  4、解析请求中的方法名,参数类型和参数信息
        Class<?> aClass = serviceBean.getClass();
        String methodName = rpcRequest.getMethodName();
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Object[] parameters = rpcRequest.getParameters();
        //  5、反射调用bean方法  使用cglib进行反射调用
        FastClass fastClass=FastClass.create(aClass);
        FastMethod method = fastClass.getMethod(methodName, parameterTypes);
        System.out.println(method.invoke(serviceBean,parameters));
        return method.invoke(serviceBean,parameters);
    



netty服务启动类RPCServer

@Service
public class RPCServer implements DisposableBean 
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workerGroup;
    @Autowired
    RPCServerHandler rpcServerHandler;

    public void startServer(String ip,Integer port)
        try 
        //1、创建线程组
        bossGroup=new NioEventLoopGroup(1);
        workerGroup=new NioEventLoopGroup();
        //2、创建服务端启动助手
        ServerBootstrap serverBootstrap=new ServerBootstrap();
        serverBootstrap.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception 
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        //添加string编解码器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //添加业务处理类
                        pipeline.addLast(rpcServerHandler);
                    
                );
            ChannelFuture sync = serverBootstrap.bind(ip, port).sync();
            System.out.println("服务端启动成功");
            sync.channel().closeFuture().sync();
         catch (InterruptedException e) 
            e.printStackTrace();
        finally 
            try 
                destroy();
             catch (Exception e) 
                e.printStackTrace();
            
        
    
    @Override
    public void destroy() throws Exception 
        if(bossGroup!=null)
            bossGroup.shutdownGracefully();
        
        if(workerGroup!=null)
            workerGroup.shutdownGracefully();
        
    

逻辑处理实现类UserServiceImpl

@RpcService
@Service
public class UserServiceImpl implements IUserService 
    Map<Object, User> userMap = new HashMap();

    @Override
    public User getById(int id) 
        if (userMap.size() == 0) 
            User user1 = new User();
            user1.setId(1);
            user1.setName("张三");
            User user2 = new User();
            user2.setId(2);
            user2.setName("李四");
            userMap.put(user1.getId(), user1);
            userMap.put(user2.getId(), user2);
        
        return userMap.get(id);
    

启动类ServerBootstrapApplication

@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner 
    @Autowired
    RPCServer rpcServer;
    public static void main(String[] args) 
        SpringApplication.run(ServerBootstrapApplication.class,args);
    

    @Override
    public void run(String... args) throws Exception 
        new Thread(new Runnable() 
            @Override
            public void run() 
                rpcServer.startServer("localhost",8088);
            
        ).start();
    

客户端代码

netty客户端

public class RpcClient 
    private EventLoopGroup group;
    private Channel channel;
    private String ip;
    private Integer port;
    private RPCClientHandler rpcClientHandler=new RPCClientHandler();
    private ExecutorService executorService= Executors.newCachedThreadPool();

    public RpcClient(String ip, Integer port) 
        this.ip = ip;
        this.port = port;
        initClient();
    

    //初始化方法连接线程组
    public void initClient()
        try 
        //1、创建线程组
        group=new NioEventLoopGroup();
        //2、创建启动助手
        Bootstrap bootstrap=new Bootstrap();
        //3、设置参数
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000)
                .handler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception 
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //todo
                        //添加客户端处理类
                        pipeline.addLast(rpcClientHandler);
                    
                );
            channel = bootstrap.connect(ip, port).sync().channel();
         catch (InterruptedException e) 
            e.printStackTrace();
            if (channel!=null)
                channel.close();
            
            if (group!=null)
                group.shutdownGracefully();
            
        
    

    /**
     * 提供给调用者关闭资源的方法
     */
    public void close()
        if (channel!=null)
            channel.close();
        
        if (group!=null)
            group.shutdownGracefully();
        
    
    /**
     * 提供给调用者发送消息的方法
     */
    public Object send(String requestMsg) throws ExecutionException, InterruptedException 
        rpcClientHandler.setRequestMsg(requestMsg);
        Future submit = executorService.submit(rpcClientHandler);
        return submit.get() ;
    

服务端处理类

public class RPCClientHandler extends SimpleChannelInboundHandler<String> implements Callable 
    ChannelHandlerContext ctx;
    String requestMsg;//发送消息
    String responseMsg;//服务端发送过来的消息

    public void setRequestMsg(String requestMsg) 
        this.requestMsg = requestMsg;
    

    /**
     * 通道读取事件
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */
    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception 
        responseMsg=s;
        //唤醒等待线程
        notify();
    

    /**
     * 通道连接就绪事件
     * @param channelHandlerContext
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception 
        ctx=channelHandlerContext;
    

    /**
     * 发送消息到服务端
     * @return
     * @throws Exception
     */
    @Override
    public synchronized Object call() throws Exception 
        //消息发送
        ctx.writeAndFlush(requestMsg);
        //线程等待
        wait();
        return responseMsg;
    

客户端代理类

/**
 * 客户端代理类
 * 创建代理对象
 * 1、封装request请求对象
 * 2、创建rpcClient对象
 * 3、发送消息
 * 4、返回结果
 */
public class RPCClientProxy 
    public static Object createProxy(Class serviceClass)
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]serviceClass, new InvocationHandler() 
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 
                // 1、封装request请求对象
                RpcRequest request=new RpcRequest();
                request.setRequestId(UUID.randomUUID().toString());
                request.setClassName(method.getDeclaringClass().getName());
                request.setParameterTypes(method.getParameterTypes());
                request.setParameters(args);
                request.setMethodName(method.getName());
                // 2、创建rpcClient对象
                RpcClient rpcClient=new RpcClient("localhost",8088);

                try 
                    // 3、发送消息
                    Object responseMsg = rpcClient.send(JSON.toJSONString(request));
                    RpcResponse response = JSON.parseObject(responseMsg.toString(), RpcResponse.class);
                    if(response.getError()!=null)
                        throw new RuntimeException(response.getError());
                    
                    // 4、返回结果
                    Object result = response.getResult();
                    return JSON.parseObject(result.toString(), method.getReturnType());
                 catch (Exception e) 
                    throw e;
                finally 
                    rpcClient.close();
                
            
        );
    

服务端启动类

public class ClientBootStrap 
    public static void main(String[] args) 
        IUserService proxy = (IUserService) RPCClientProxy.createProxy(IUserService.class);
        User byId = proxy.getById(1);
        System.out.println(byId);
    

分布式理论,架构设计自定义rpc(代码片段)

分布式理论,架构设计(四)自定义RPC自定义RPCRMI基于netty实现RPC框架代码实现服务端代码客户端代码自定义RPC在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现... 查看详情

分布式理论,架构设计netty高级应用(代码片段)

分布式理论,架构设计(三)Netty高级应用Netty高级应用HTTP服务器开发服务器开发代码实现网页版聊天室websocketWebSocket和HTTP的区别代码实现springboot+nettynetty中的粘包和拆包粘包和拆包的解决办法Netty高级应用HTTP服... 查看详情

分布式理论,架构设计netty高级应用(代码片段)

分布式理论,架构设计(三)Netty高级应用Netty高级应用HTTP服务器开发服务器开发代码实现网页版聊天室websocketWebSocket和HTTP的区别代码实现springboot+nettynetty中的粘包和拆包粘包和拆包的解决办法Netty高级应用HTTP服... 查看详情

分布式理论,架构设计socket和io模型(代码片段)

分布式理论,架构设计(一)Socket和IO模型Socketsocket整体流程代码实现I/O模型NIO详解Socket和IO模型Socketsocket,套接字,就是两台主机之间的连接端点,TCP/IP协议是传输层协议,主要解决数据如何在网络中... 查看详情

架构设计|分布式事务①概念简介和基础理论(代码片段)

本文源码:GitHub·点这里||GitEE·点这里一、分布式事务简介1、转账经典案例跨地区和机构的转账的业务在实际生活中非常常见,基础流程如下:账户01通过一系列服务和支付的流程,把钱转入账户02,在这一过程中,如果账户01出... 查看详情

架构设计|分布式事务①概念简介和基础理论(代码片段)

本文源码:GitHub·点这里||GitEE·点这里一、分布式事务简介1、转账经典案例跨地区和机构的转账的业务在实际生活中非常常见,基础流程如下:账户01通过一系列服务和支付的流程,把钱转入账户02,在这一过程中,如果账户01出... 查看详情

分布式理论,架构设计netty高级应用(代码片段)

分布式理论,架构设计(三)Netty高级应用Netty高级应用HTTP服务器开发服务器开发代码实现网页版聊天室websocketWebSocket和HTTP的区别代码实现springboot+nettynetty中的粘包和拆包粘包和拆包的解决办法Netty高级应用HTTP服... 查看详情

带头撸抽奖系统,ddd+rpc开发分布式架构!(代码片段)

作者:小傅哥博客:https://bugstack.cn沉淀、分享、成长,让自己和他人都能有所收获!😄一、咋,撸个项目?总有粉丝伙伴问傅哥,有没有能上手练习技术的项目,现在学了这么多技术知识、看... 查看详情

带头撸抽奖系统,ddd+rpc开发分布式架构!(代码片段)

作者:小傅哥博客:https://bugstack.cn沉淀、分享、成长,让自己和他人都能有所收获!😄一、咋,撸个项目?总有粉丝伙伴问傅哥,有没有能上手练习技术的项目,现在学了这么多技术知识、看... 查看详情

重复造轮子系列:分布式rpc框架设计_00(代码片段)

摘要:本文介绍了分布式框架的简单实现,说明了自己的设计思路,以及RPC的一些具体细节。在文末,贴出一些关于rpc的资料。0x00:什么是RPCwiki给出的定义如下:Indistributedcomputing,aremoteprocedurecall(RPC)iswhenacomputerprogramcausesaprocedure(... 查看详情

dubbo的rpc远程过程调用+dubbo的负载均衡+zookeeper注册中心(代码片段)

...的RPC远程过程调用+Dubbo的负载均衡+Zookeeper注册中心分布式基础理论应用架构演变单一应用架构垂直应用架构分布式服务架构RPC远程过程调用dubbo核心概念环境搭建_zookeeper注册中心环境搭建_管理控制台编写提供者,消费... 查看详情

简述rpc原理实现(代码片段)

...业务对技术的要求,技术架构需要从单体应用架构升级到分布式服务架构,来降低公司的技术成本,更好的适应业务的发展。分布式服务架构的诸多优势,这里就不一一列举了,今天围绕的话题是服务框架,为了推行服务化,必... 查看详情

微服务与rpc/grpc(代码片段)

...是用户可以感知最小功能集从广义上来讲,微服务是一种分布式系统解决方案,推动细粒度服务的使用,这些服务协同工作微服务架构微服务架构风格是将单个应用程序作为一组小型服务开发的方法,每个服务程序都在自己的进... 查看详情

设计一个分布式rpc框架(代码片段)

0前言提前先祝大家春节快乐!好了,先简单聊聊。我从事的是大数据开发相关的工作,主要负责的是大数据计算这块的内容。最近Hive集群跑任务总是会出现Thrift连接HS2相关问题,研究了解了下内部原理,突然来了兴趣,就想着... 查看详情

分布式事务专题-基本理论(capbase)(代码片段)

...介绍结尾前言基于微服务架构设计功能,总是绕不开分布式事务的问题,前面已经讲解了分布式事务的基本概念,这一篇文章,主要介绍一下分布式事务基本理论CAP和BASE理论,有了这些理论才能结合业务选择... 查看详情

分布式事务专题-基本理论(capbase)(代码片段)

...介绍结尾前言基于微服务架构设计功能,总是绕不开分布式事务的问题,前面已经讲解了分布式事务的基本概念,这一篇文章,主要介绍一下分布式事务基本理论CAP和BASE理论,有了这些理论才能结合业务选择... 查看详情

分布式事务专题-基本理论(capbase)(代码片段)

...介绍结尾前言基于微服务架构设计功能,总是绕不开分布式事务的问题,前面已经讲解了分布式事务的基本概念,这一篇文章,主要介绍一下分布式事务基本理论CAP和BASE理论,有了这些理论才能结合业务选择... 查看详情

flinkblob架构(代码片段)

...消息长度超出了akka.framesize的大小2.在HA摸式中,利用底层分布式文件系统分发单个高负荷RPC消息,比如:TaskDeploymentDescriptor,给多个接受对象。3.失败导致重新部署过程中复用RPC消息TaskManager的日志文件为了在webui上展示tas 查看详情