使用newlife网络库管道模式解决数据粘包(代码片段)

yushuo yushuo     2022-11-28     759

关键词:

上一篇我们讲了 如何创建一个基本的Newlife网络服务端 这边我们来讲一下如何解决粘包的问题

在上一篇总我们注册了Newlife的管道处理器 ,我们来看看他是如何实现粘包处理的

svr.Add<ReciveFilter>();//粘包处理管道

首先看一下我们设备的上传数据协议

 

 设备上报的数据包头包含了固定的包头包尾,整个包的数据长度,设备编号。

 包头:板卡类型,帧类型 2个字节 0x01 0x70

 帧长度: 为两个字节 并且数据的字节序为  高字节在前 ,C#正常默认为低字节在前。

 设备号:15位的ASCII 字符串

 包尾: 两个字节 0x0D 0x0A  固定

 下面来解决粘包的问题

  Newlife网络库提供了几种常见的封包协议来解决粘包的问题,其中有一个 LengthFieldCodec解码器 这个解码器以长度字段作为头部 恰好符合我们的需求,我们就以这个解码器稍作改造来解决我们的粘包问题吧

  由于这个解码器是适用于 只包含包头和包体的数据结构,且长度为包体的长度,而我们的协议 是包含包头包体包尾,并且帧长度为整个包的长度,长度为高位在前的数据结构,所以我们需要对整个解码器稍微做一些改造来符合我们的数据结构 。

  我们来看下代码 其中

  

        #region 属性
        /// <summary>长度所在位置</summary>
        public Int32 Offset  get; set; =2;

        /// <summary>长度占据字节数,1/2/4个字节,0表示压缩编码整数,默认2</summary>
        public Int32 Size  get; set;  = 2;

        /// <summary>过期时间,超过该时间后按废弃数据处理,默认500ms</summary>
        public Int32 Expire  get; set;  = 500;
        #endregion

在我们的协议中可以看到 设置了数据包的长度位置,长度占据的字节数,下面我们来获取一下整个包的长度

 

/// <summary>从数据流中获取整帧数据长度</summary>
        /// <param name="pk"></param>
        /// <param name="offset"></param>
        /// <param name="size"></param>
        /// <returns>数据帧长度(包含头部长度位)</returns>
        protected  Int32 GetLength(Packet pk, Int32 offset, Int32 size)
        
            if (offset < 0) return pk.Total - pk.Offset;
            // 数据不够,连长度都读取不了
            if (offset >= pk.Total) return 0;

            // 读取大小
            var len = 0;
            switch (size)
            
                case 2:
                    var lenArry = pk.ReadBytes(offset, 2);
                    //高位在前,反转数组,获取长度
                    Array.Reverse(lenArry);
                    len = lenArry.ToUInt16();
                    break;
                default:
                    throw new NotSupportedException();
            

            // 判断后续数据是否足够
            if (len > pk.Total) return 0;

            return len;
        

获取长度后我们就可以从数据流中读取一个完整的包了

 

        /// <summary>解码</summary>
        /// <param name="context"></param>
        /// <param name="pk"></param>
        /// <returns></returns>
        protected override IList<Packet> Decode(IHandlerContext context, Packet pk)
        
            var ss = context.Owner as IExtend;
            var mcp = ss["CodecItem"] as CodecItem;
            if (mcp == null) ss["CodecItem"] = mcp = new CodecItem();

            var pks = ParseNew(pk, mcp, 0, ms => GetLength(ms, Offset, Size), Expire);

            // 跳过头部长度
            var len = Offset + Math.Abs(Size);
            foreach (var item in pks)
            
                item.Set(item.Data, item.Offset + len, item.Count - len);
                //item.SetSub(len, item.Count - len);
            

            return pks;
        


        #region 粘包处理
        /// <summary>分析数据流,得到一帧数据</summary>
        /// <param name="pk">待分析数据包</param>
        /// <param name="codec">参数</param>
        /// <param name="getLength">获取长度</param>
        /// <param name="expire">缓存有效期</param>
        /// <returns></returns>
        protected IList<Packet> ParseNew(Packet pk, CodecItem codec, int startIndex, Func<Packet, Int32> getLength, Int32 expire = 5000)
        
            var _ms = codec.Stream;
            var nodata = _ms == null || _ms.Position < 0 || _ms.Position >= _ms.Length;

            var list = new List<Packet>();
            // 内部缓存没有数据,直接判断输入数据流是否刚好一帧数据,快速处理,绝大多数是这种场景
            if (nodata)
            
                if (pk == null) return list.ToArray();

                var idx = 0;
                while (idx < pk.Total)
                
                    //var pk2 = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                    var pk2 = pk.Slice(idx);
                    var len = getLength(pk2);
                    if (len <= 0 || len > pk2.Count) break;

                    pk2.Set(pk2.Data, startIndex, len);
                    //pk2.SetSub(0, len);
                    list.Add(pk2);
                    idx += len;
                
                // 如果没有剩余,可以返回
                if (idx == pk.Total) return list.ToArray();

                // 剩下的
                //pk = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                pk = pk.Slice(idx);
            

            if (_ms == null) codec.Stream = _ms = new MemoryStream();

            // 加锁,避免多线程冲突
            lock (_ms)
            
                // 超过该时间后按废弃数据处理
                var now = TimerX.Now;
                if (_ms.Length > _ms.Position && codec.Last.AddMilliseconds(expire) < now)
                
                    _ms.SetLength(0);
                    _ms.Position = 0;
                
                codec.Last = now;

                // 合并数据到最后面
                if (pk != null && pk.Total > 0)
                
                    var p = _ms.Position;
                    _ms.Position = _ms.Length;
                    pk.WriteTo(_ms);
                    _ms.Position = p;
                

                // 尝试解包
                while (_ms.Position < _ms.Length)
                
                    //var pk2 = new Packet(_ms.GetBuffer(), (Int32)_ms.Position, (Int32)_ms.Length);
                    var pk2 = new Packet(_ms);
                    var len = getLength(pk2);

                    // 资源不足一包
                    if (len <= 0 || len > pk2.Total) break;

                    // 解包成功
                    pk2.Set(pk2.Data, startIndex, len);
                    //pk2.SetSub(0, len);
                    list.Add(pk2);

                    _ms.Seek(len, SeekOrigin.Current);
                

                // 如果读完了数据,需要重置缓冲区
                if (_ms.Position >= _ms.Length)
                
                    _ms.SetLength(0);
                    _ms.Position = 0;
                

                return list;
            
        

   粘包处理管道完成后,就可以在Recive中去处理一个完整的数据包啦,我来解析一下这个状态的数据并且来保存设备连接  

  首先定义一个字典项用来保存设备的连接信息.设备号,连接的SessionId

  

   /// <summary>
        /// newLife连接保持 
        /// </summary>
        private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();



  由于我们的数据中 帧类型不同的请求中帧类型是不一样的 所以解析数据需要做区分处理 我们来或者状态上传信息中的设备号并且和连接关联

 private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();
        private object _lock=new object();
        private void Recive(object sender, ReceivedEventArgs e)
        

            INetSession session = (INetSession)sender;
            var pk = e.Message as Packet;
            if (pk.Count == 0)
            
                XTrace.WriteLine("数据包解析错误");
                return;

            
            try
            
                //数据包
                var respBytes = pk.Data;
                //获取帧类型
                var dataTypeBytes = respBytes[1];

                if (dataTypeBytes == 0x70)
                
                    //数值
                    byte[] deviceNoByte = new byte[15];
                    Buffer.BlockCopy(respBytes, 4, deviceNoByte, 0, 15); //从缓冲区里读取包头的字节
                    string deviceNo = Encoding.ASCII.GetString(deviceNoByte);
                    XTrace.WriteLine("设备编号:" + deviceNo);
//保存连接信息
SaveClientConnection(deviceNo, session.ID);
//获取设备号后保存连接信息 //支付宝 catch (Exception ex) XTrace.WriteLine(ex.Message); /// <summary> /// 保存在线信息 /// </summary> /// <param name="deviceNo"></param> /// <param name="sessionId"></param> private void SaveClientConnection(string deviceNo, int sessionId) lock (_lock) if (OnLineClients.ContainsKey(deviceNo)) OnLineClients[deviceNo] = sessionId; else OnLineClients.Add(deviceNo,sessionId);

 

好了数据粘包问题解决啦同时保存了设备连接信息,下面来解决如何定时检查测试在线状态。

 

    

 

  

  

java网络编程——粘包拆包出现的原因及解决方式(代码片段)

...程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》中“IO多路复用模式”一节中的代码:服务端@Slf4jpublicclassNIOServerpublicstaticvoidmain(Str 查看详情

java网络编程——粘包拆包出现的原因及解决方式(代码片段)

...程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》中“IO多路复用模式”一节中的代码:服务端@Slf4jpublicclassNIOServerpublicstaticvoidmain(Str 查看详情

java网络编程——粘包拆包出现的原因及解决方式(代码片段)

...程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》中“IO多路复用模式”一节中的代码:服务端@Slf4jpublicclassNIOServerpublicstaticvoidmain(String[]args)throwsExceptionServerSocketChannelserverSocketChannel=ServerSocketChannel.open();ser... 查看详情

java网络编程——粘包拆包出现的原因及解决方式(代码片段)

...程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》中“IO多路复用模式”一节中的代码:服务端@Slf4jpublicclassNIOServerpublicstaticvoidmain(String[]args)throwsExceptionServerSocketChannelserverSocketChannel=ServerSocketChannel.open();ser... 查看详情

tcp网络通讯如何解决分包粘包问题(有模拟代码)

TCP作为常用的网络传输协议,数据流解析是网络应用开发人员永远绕不开的一个问题。TCP数据传输是以无边界的数据流传输形式,所谓无边界是指数据发送端发送的字节数,在数据接收端接受时并不一定等于发送的字节数,可能... 查看详情

使用newlife网络库构建可靠的自动售货机socket服务端(代码片段)

...windows服务为宿主的方式来运行最为可靠,那么我们可以使用TopShelf来构建这样一个服务宿主 2,实现设备的在线状态实时可知,我们需要定时的去判断这个设备是否在定时给服务 查看详情

网络编程之粘包(代码片段)

...发送给接收方3.接收方通过此长度准确接收消息上述操作使用了第三方模块structstruct的作用,pack方法将一个整数,转换成固定长度的bytes类型unpack方法将一个bytes类型的长度转换为整数解决粘包问题的核心,给数据流添加自定义... 查看详情

网络编程之粘包问题使用socketserver实现并发(代码片段)

一、粘包问题注意:粘包问题只有tcp协议并且udp协议永远不会粘包粘包问题的产生:简述:粘包问题的产生主要是由于tcp协议传输数据(其内置的nagle算法来进行的)会将数据较小的且发送时间较短的合并成一个包从发送端发送... 查看详情

libevent粘包分包解决方案:bufferevent+evbuffer(代码片段)

...触发的网络库,适用于windows、linux、bsd等多种平台,内部使用select、epoll、kqueue等系统调用管理事件机制。著名分布式缓存软件memca 查看详情

socket的粘包问题解决方案

粘包:由于接受recv有最大限制,管道中有大于最大限制字节时, 第二次recv的会是之前残留的信息,这种现象叫做粘包。TCP协议是面向连接的,面向流的,当在发送数据时接受方不知道要收多少字节的数据,但由于缓存区大... 查看详情

基于tcp协议下粘包现象和解决方案(代码片段)

...冲区,输入缓冲区和输出缓冲区。write()/send()并不立即向网络中传输数据,而是先将数据写入缓冲区中,再由TCP协议将数据从缓冲区发送到目标机器。一旦将数据写入到缓冲区,函数就可以成功返回,不管它们有没有到达目标机... 查看详情

解决粘包(代码片段)

...果的存放位置stderr=subprocess.PIPE#错误结果的存放位置)#从管道里面拿出结果,通过s 查看详情

关于粘包的解决方法(代码片段)

...道还点进来了,看来是真爱了。就是字面意思,网络通信中数据包粘在一起了。为什么会产生粘包?对于TCP:一次一个小包太慢了吧!!!大包也就算了&#x 查看详情

newlife.xcode上手指南(代码片段)

...作数据库,解决90%以上的数据库操作场景.是大石头和他的NewLife团队10年来的智慧结晶,感谢前人栽树!    & 查看详情

tcp粘包以及解决方案(代码片段)

...发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端,就难于分辨出来了,必须提供科学 查看详情

netty_03_bytebuf和网络中拆包粘包问题及其解决(代码片段)

文章目录一、前言二、ByteBuf(NettyAPI中定义的数据类型)2.1ByteBuf2.1.1ByteBuf创建的方法有两种2.1.2ByteBuf的存储结构2.1.3ByteBuf中常用的方法APIReader相关方法Write相关方法Write可能导致扩容2.2ByteBuf代码(演示读写指针移动和扩容)... 查看详情

网络编程之粘包(代码片段)

一、什么是粘包须知:只有TCP有粘包现象,UDP永远不会粘包粘包不一定会发生TCP发生粘包的两种情况:      1.由于Nagle算法,将多次间隔小且数量小的数据,合并成一个数据块      2.数据量发送大,接受少首先需... 查看详情

netty_03_bytebuf和网络中拆包粘包问题及其解决(代码片段)

...wappedBuffer)2.5.1Unpooled.compositeBuffer2.5.2Unpooled.wrappedBuffer三、网络传输中的拆包粘包问题3.1拆包粘包问题3.2应用层定义通信协议3.2.1消息长度固定3.2.2特定分隔符3.2.3消息长度加消息内容加分隔符四、Netty解决网络传输中的拆包粘包问... 查看详情