网络i/o编程模型21netty的粘包和拆包问题的解决方案(代码片段)

健康平安的活着 健康平安的活着     2023-03-13     587

关键词:

一 问题背景描述

1.1 问题描述

tcp是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务端)都要有一一成对的socket;

客户端为了每次更有效的发送更多的数据给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块然后进行封包。

问题:

这样虽然效率提高了,但是接收端就难于分辨出完整的数据包了,tcp无消息保护边界,需要在接收端处理消息边界问题,也就是我们说的粘包,拆包问题。

1.2 粘包和拆包

假设有两个数据包D1和D2,由于服务端一次读取到直接数是不确定的,所以可能存在以下4种情况:
1.客户端分别发送两个独立的包,D1和D2,没有出现粘包和拆包。

2.服务端一次性接收到了连个数据数据包,D1和D2粘合在一起了,这就是粘包
3.服务端分两次读到了数据,第一次读取完整的D1和D2的一部分,第二次为D2的剩余的内容,这就是拆包。
3.服务端分两次读到了数据,第一次读取D1的一部分,第二次为D1的剩余部分和D2的完整数据,这就是拆包。


二  案例

2.1 客户端代码

1.客户端

public class NettyClient 
    public static void main(String[] args)  throws  Exception
        EventLoopGroup group = new NioEventLoopGroup();
        try 
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        finally 
            group.shutdownGracefully();
        
    

2.初始化代码

public class MyClientInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new MyClientHandler());
    

3.handler处理

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> 
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        //使用客户端发送10条数据 hello,server 编号
        for(int i= 0; i< 10; ++i) 
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        
    

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception 
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("客户端接收到消息=" + message);
        System.out.println("客户端接收消息数量=" + (++this.count));

    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    

2.2 服务端代码 

1.服务端代码

public class NettyTcPNianServer 
    public static void main(String[] args) throws Exception

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try 

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        finally 
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        

    

2.初始化代码

public class MyServerInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
        ChannelPipeline pipeline =socketChannel.pipeline();
        pipeline.addLast(new MyServerHandler());
    

3.handler

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> 
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception 

        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服务器接收到数据 " + message);
        System.out.println("服务器接收到消息量=" + (++this.count));
        //服务器回送数据给客户端, 回送一个随机id ,
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        //cause.printStackTrace();
        ctx.close();
    

2.3 调试

1.启动服务端

2.启动客户端

1.客户端

 2.客户端

三  粘包和拆包解决方案

3.1 解决思路

使用自定义协议+加编码解码,设定每次发送数据的长度,服务器读取数据的长度。避免多读或者少读,造成粘包或者拆包。

3.2 执行流程图

 3.3 代码

3.3.1 自定义协议

public class InfoProtocol 
    private int len; //关键
    private byte[] content;

    public int getLen() 
        return len;
    

    public void setLen(int len) 
        this.len = len;
    

    public byte[] getContent() 
        return content;
    

    public void setContent(byte[] content) 
        this.content = content;
    

3.3.2 解码

public class MyMessageDncoder extends ReplayingDecoder<Void> 
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception 
        System.out.println(" step3: MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码-> MessageProtocol 数据包(对象)
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        //封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
       InfoProtocol messageProtocol = new InfoProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    

3.3.3 编码

public class MyMessageEncoder extends MessageToByteEncoder<InfoProtocol> 
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, InfoProtocol infoProtocol, ByteBuf byteBufOut) throws Exception 
        System.out.println("step2: MyMessageEncoder encode 方法被调用");
        byteBufOut.writeInt(infoProtocol.getLen());
        byteBufOut.writeBytes(infoProtocol.getContent());
    

3.3.4 客户端

public class NettChaBaoClient 
    public static void main(String[] args)  throws  Exception
        EventLoopGroup group = new NioEventLoopGroup();
        try 
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8000).sync();
            channelFuture.channel().closeFuture().sync();
        finally 
            group.shutdownGracefully();
        
    

3.3.5 客户端-初始化

public class MyClientInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel ch) throws Exception 

        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageEncoder()); //加入编码器
        pipeline.addLast(new MyMessageDncoder()); //加入解码器
        pipeline.addLast(new MyClientHandler());
       // pipeline.addLast(new MyMessageEncoder()); //加入编码器
    

3.3.6  客户端-自定义handler

public class MyClientHandler  extends SimpleChannelInboundHandler<InfoProtocol> 
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        //使用客户端发送10条数据 "今天天气冷,吃火锅" 编号
        for(int i = 0; i< 5; i++) 
            String mes = "北京又出现口罩事件了,去酒吧惹的祸!!!";
            byte[] content = mes.getBytes(Charset.forName("utf-8"));
            int length = mes.getBytes(Charset.forName("utf-8")).length;
            System.out.println("===========step1: client 的handler:。。。。。。");
            //创建协议包对象
            InfoProtocol messageProtocol = new InfoProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);
        

    

    //    @Override
    protected void channelRead0(ChannelHandlerContext ctx, InfoProtocol msg) throws Exception 
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("客户端接收到消息如下");
        System.out.println("长度=" + len);
        System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
        System.out.println("客户端接收消息数量=" + (++this.count));

    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        System.out.println("异常消息=" + cause.getMessage());
        ctx.close();
    

3.3.7  服务端

public class NettyChaBaoServer 
    public static void main(String[] args) throws Exception
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
            channelFuture.channel().closeFuture().sync();
        finally 
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    

3.3.8  服务端-初始化

public class MyServerInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel ch) throws Exception 
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageDncoder());//解码器
        pipeline.addLast(new MyMessageEncoder());//编码器
        pipeline.addLast(new MyserverHandler());
    

3.3.9  服务端-自定义handler

public class MyserverHandler extends SimpleChannelInboundHandler<InfoProtocol> 
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, InfoProtocol msg) throws Exception 
        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println();
        System.out.println("服务器接收到信息如下");
        System.out.println("长度=" + len);
        System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
        System.out.println("服务器接收到消息包数量=" + (++this.count));
        //回复消息
        String responseContent = UUID.randomUUID().toString();
        int responseLen = responseContent.getBytes("utf-8").length;
        byte[]  responseContent2 = responseContent.getBytes("utf-8");
        //构建一个协议包
        InfoProtocol messageProtocol = new InfoProtocol();
        messageProtocol.setLen(responseLen);
        messageProtocol.setContent(responseContent2);
        ctx.writeAndFlush(messageProtocol);
        System.out.println("================================================");
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        //cause.printStackTrace();
        ctx.close();
    


3.3.10  调试

1.服务端

 2.客户端

 

解决粘包和拆包问题(代码片段)

...的粘包和拆包问题我们知道,TCP是以一种流的方式来进行网络转播的,当tcp三次握手简历通信后,客户端服务端之间就建立了一种通讯管道,我们可以想象成自来水管道,流出来的水是连城一片的,是没有分界线的。TCP底层并不... 查看详情

详解啥是tcp粘包和拆包现象并演示netty是如何解决的

...因为我们在C/S这种传输模型下,以TCP协议传输的时候,在网络中的byte其实就像是河水,TCP就像一个搬运工,将这流水从一端转送到另一端,这时又分两种情况:下面通过Netty重现TCP粘包和拆包现象。其中关键的代码如下从上面的... 查看详情

服务端netty客户端非netty处理粘包和拆包的问题

...便一直没有处理粘包的问题,今天专门花了时间来搞NETTY的粘包处理,要知道在高并发下,不处理粘包是不可能的,数据流的混乱会造成业务的崩溃什么的我就不说了。所以这个问题在我心里一直是个结。 使用NETTY真的很幸... 查看详情

tcp粘包拆包基本解决方案

上个小节我们浅析了在Netty的使用的时候TCP的粘包和拆包的现象,Netty对此问题提供了相对比较丰富的解决方案 Netty提供了几个常用的解码器,帮助我们解决这些问题,其实上述的粘包和拆包的问题,归根结底的解决方案就是... 查看详情

粘包和拆包(代码片段)

写在前面粘包、拆包是Socket编程中最常遇见的一个问题,本文只对粘包、拆包现象及发生的原因做简要分析,具体如何解决粘包和拆包的问题,在后续文章中会详细介绍。什么是粘包、拆包TCP是个"流"协议,所谓流,就是没有界... 查看详情

netty解决粘包和拆包问题的四种方案(代码片段)

 在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接。由于微服务往对方发送信息的时候,所有的请求都是使... 查看详情

netty解决粘包和拆包问题的四种方案(代码片段)

 在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接。由于微服务往对方发送信息的时候,所有的请求都是使... 查看详情

day476.tcp粘包和拆包及解决方案-netty(代码片段)

...界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题,看一张图示意图TCP粘包、拆包图解对图的说明:假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的࿰... 查看详情

day476.tcp粘包和拆包及解决方案-netty(代码片段)

...界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题,看一张图示意图TCP粘包、拆包图解对图的说明:假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的࿰... 查看详情

tcp的粘包拆包问题+解决方案

 为什么TCP有而UDP没有粘包❓1️⃣因为udp的数据包有保护边界。2️⃣tcp是以字节流的形式,也就是没有边界,所以应用层的数据在传输层的时候就可能会出现粘包和拆包问题。出现这种问题的原因图解 查看详情

网络1-28(代码片段)

...p能不能一次连接多次请求,不等后端返回5.讲讲怎么理解网络编程6.网络编程过程,建立一个socket连接步骤过程7.tcp协议过程8.TCP有哪些状态?9.TCP的LISTEN状态是什么?10.TCP的CLOSE_WAIT状态是什么?11.TCP的TIME_WAIT状态存在的理由 查看详情

tcp粘包和拆包

...两个数据包粘在一起的情况,这就是TCP协议中经常会遇到的粘包以及拆包的问题。传输层的UDP协议是否会发生粘包或者拆包问题?不会。UDP是基于报文发送的,在UDP首部采用了16bit来指示UDP数据报文的长度,因此在应用层能很好... 查看详情

tcp粘包和拆包问题

1)产生TCP粘包和拆包问题的主要原因是,操作系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就... 查看详情

java网络编程——粘包拆包出现的原因及解决方式(代码片段)

在基于TCP协议的网络编程中,不可避免地都会遇到粘包和拆包的问题。什么是粘包和拆包?先来看个例子,还是上篇文章《Java网络编程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》中“IO多路复用模式... 查看详情

java网络编程——粘包拆包出现的原因及解决方式(代码片段)

在基于TCP协议的网络编程中,不可避免地都会遇到粘包和拆包的问题。什么是粘包和拆包?先来看个例子,还是上篇文章《Java网络编程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》中“IO多路复用模式... 查看详情

什么是粘包和拆包,netty如何解决粘包拆包?(代码片段)

Netty粘包拆包TCP粘包拆包是指发送方发送的若干包数据到接收方接收时粘成一包或某个数据包被拆开接收。如下图所示,client发送了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。上图中演示了粘包和拆... 查看详情

tcp粘包和拆包及解决方案(代码片段)

目录 1. TCP粘包和拆包基本介绍2.TCP粘包和拆包现象实例3.TCP粘包和拆包解决方案 4.具体实例1. TCP粘包和拆包基本介绍TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一... 查看详情

分布式理论,架构设计netty高级应用(代码片段)

...websocketWebSocket和HTTP的区别代码实现springboot+nettynetty中的粘包和拆包粘包和拆包的解决办法Netty高级应用HTTP服务器开发Netty的HTTP协议栈无论在性能还是可靠性上,都表 查看详情