8.基于netty实现群聊,心跳检测(代码片段)

PacosonSWJTU PacosonSWJTU     2022-12-08     159

关键词:

【README】

1.本文总结自B站《netty-尚硅谷》,很不错;

2.本文po出了 Unpooled创建缓冲区的 代码示例;

3.本文示例代码基于netty实现以下功能:

  • 群聊客户端及服务器;
  • 心跳检测;

【1】Unpooled创建缓冲区

Unpooled定义:

  • 是Netty 提供的一个专门用来操作缓冲区(即Netty的数据容器)的工具

【1.1】Unpooled.buffer-申请给定容量的缓冲区

1)Unpooled.buffer(capacity) 定义:

public static ByteBuf buffer(int initialCapacity) 
        return ALLOC.heapBuffer(initialCapacity);
    

代码示例 :

public class NettyByteBuf61 
    public static void main(String[] args) 
        // 创建一个对象,该对象包含一个数组 byte[10]
        // 在netty buf中,不需要像nio那样 执行flip 切换读写模式
        // 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置
        ByteBuf byteBuf = Unpooled.buffer(10);
        for (int i = 0; i < 10; i++) 
            byteBuf.writeByte(i); // writerIndex 自增
        
        // 输出
        for (int i = 0; i < byteBuf.capacity(); i++) 
            System.out.printf(byteBuf.readByte() + " "); // readerIndex 自增
//            System.out.println(byteBuf.getByte(i));
        
        // 查看 byteBuf 的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)
        System.out.println(byteBuf);
    

【代码解说】

  • 在netty buf中,不需要像nio那样 执行flip 切换读写模式;
  • 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置;

运行结果:

0 1 2 3 4 5 6 7 8 9
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)


【1.2】Unpooled.copiedBuffer() 创建buf 缓冲区

copiedBuffer(CharSequence string, Charset charset) 定义:

  • 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于NIO中的ByteBuffer但有区别)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset) 
        if (string == null) 
            throw new NullPointerException("string");
        

        if (string instanceof CharBuffer) 
            return copiedBuffer((CharBuffer) string, charset);
        

        return copiedBuffer(CharBuffer.wrap(string), charset);
    

代码示例

public class NettyByteBuf62 
    public static void main(String[] args) 
        // 通过 Unpooled.copiedBuffer  创建 buf缓冲区
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", StandardCharsets.UTF_8);
        // 1 使用相关方法-byteBuf.hasArray()
        if (byteBuf.hasArray()) 
            String content = new String(byteBuf.array(), StandardCharsets.UTF_8);
            System.out.println(content);
            // 查看ByteBuf的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 24)
            System.out.println("bytebuf = " + byteBuf);
            // 查看偏移量
            System.out.println("byteBuf.arrayOffset() = " + byteBuf.arrayOffset()); // 0
            // 查看 readerIndex
            System.out.println("byteBuf.readerIndex() = " + byteBuf.readerIndex()); // 0
            // 查看 writerIndex
            System.out.println("byteBuf.writerIndex() = " + byteBuf.writerIndex()); // 12
            // 查看 capacity
            System.out.println("byteBuf.capacity() = " + byteBuf.capacity());
            // 查看可读取的字节数量 12
            System.out.println("byteBuf.readableBytes() = " + byteBuf.readableBytes());
            // 使用for循环读取byteBuf
            for (int i = 0; i < byteBuf.readableBytes(); i++) 
                System.out.print((char)byteBuf.getByte(i));
            
            System.out.println();
            // 读取 byteBuf 其中某一段,从下标4开始,读取6个字节
            CharSequence charSequence = byteBuf.getCharSequence(4, 6, StandardCharsets.UTF_8);
            System.out.println(charSequence);
        
    

运行结果:

hello world                      
bytebuf = UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33)
byteBuf.arrayOffset() = 0
byteBuf.readerIndex() = 0
byteBuf.writerIndex() = 11
byteBuf.capacity() = 33
byteBuf.readableBytes() = 11
hello world
o worl


【2】netty群聊客户端与服务器

需求描述:

  1. 基于Netty 实现 多人群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2.  服务器端:可以监测用户上线,离线,并实现消息转发功能;
  3.  客户端: 通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(由服务器转发得到);

【2.1】netty服务器

1)群聊服务器代码

/**
 * @Description netty群聊服务器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月03日
 */
public class NettyGroupChatServer63 

    private int port;

    public NettyGroupChatServer63(int port) 
        this.port = port;
    

    public static void main(String[] args) 
        try 
            new NettyGroupChatServer63(8089).run();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

    public void run() throws InterruptedException 
        // 创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            // 服务器启动引导对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception 
                            // 获取pipeline
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 添加解码处理器 编码器
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            // 添加业务处理handler
                            pipeline.addLast(new NettyGroupChatServerHandler());
                        
                    );
            ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
            System.out.println("netty服务器启动成功");
            // 监听关闭
            channelFuture.channel().closeFuture().sync();
         finally 
            // 优雅关闭线程
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        

    

2)群聊服务器处理器

/**
 * @Description netty服务器处理器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月03日
 */
public class NettyGroupChatServerHandler extends SimpleChannelInboundHandler<String> 

    // 定义一个 channel 组,用于管理channel
    // GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
    private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 读取数据并转发
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception 
        // 获取当前channel
        Channel channel = ctx.channel();
        // 遍历 channelGroup, 根据不同情况 回送不同消息
        channelGroup.forEach(otherChannel-> 
            if (channel != otherChannel)  // 非当前channel, 直接转发
                otherChannel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户 " + channel.remoteAddress() + "说:" + msg + "\\n");
             else  // 回显自己发送的消息给自己
                channel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "自己说:" + msg + "\\n");
            
        );
    

    // 一旦连接建立,第一个被执行
    // 将当前channel 添加到channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception 
        Channel channel = ctx.channel();
        // 把客户端加入群组的信息发送到其他客户端
        channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 加入聊天");
        // 把当前channel 添加到 channel 组
        channelGroup.add(channel);
    

    // 表示 channel 处于活动状态, 提示 xx 上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 上线了");
    

    // 表示 channel 处于离线状态, 提示 xx 离线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 
        System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 离开了");
    

    // 断开连接,把xx客户离开的信息推送给其他在线客户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 离开了");
        System.out.println("channelGroup.size() = " + channelGroup.size());
    

    // 发送异常如何处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        // 关闭通道
        ctx.close();
    

【2.2】netty客户端

1)群聊客户端代码:

/**
 * @Description netty群聊客户端
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月03日
 */
public class NettyGroupChatClient64 

    /** 主机和端口 */
    private final String host;
    private final int port;

    /**
     * @description 构造器
     * @author xiao tang
     * @date 2022/9/3
     */
    public NettyGroupChatClient64(String host, int port) 
        this.host = host;
        this.port = port;
    

    public static void main(String[] args) 
        try 
            new NettyGroupChatClient64("127.0.0.1", 8089).run();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

    public void run() throws InterruptedException 
        // 事件运行的线程池
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try 
            // 客户端启动引导对象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception 
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 添加解码器 编码器
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            // 添加业务逻辑的 handler
                            pipeline.addLast(new NettyGroupChatClietnHandler());
                        
                    );
            //  连接给定主机的端口,阻塞直到连接成功
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            // 得到 channel
            Channel channel = channelFuture.channel();
            System.out.println("----------" + channel.localAddress() + "----------");
            // 客户端需要输入信息,创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) 
                String msg = scanner.nextLine();
                // 通过channel 发送到服务器
                channel.writeAndFlush(msg);
            
         finally 
            // 关闭线程池,释放所有资源,阻塞直到关闭成功
            eventExecutors.shutdownGracefully().sync();
        

    

2)群聊客户端处理器代码:

/**
 * @Description netty群聊客户端处理器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月03日
 */
public class NettyGroupChatClietnHandler extends SimpleChannelInboundHandler<String> 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception 
        System.out.println(msg.trim());
    

【2.3】 运行结果:

1)服务器与客户端: 服务器1个,客户端3个;

2)客户端离线:

 


【3】netty心跳检测

【3.1】netty心跳检测概述

1)netty定义的空闲状态事件:

Triggers an @link IdleStateEvent when a @link Channel has not performed
* read, write, or both operation for a while.

当一个通道一段时间内没有执行 读,写,或读写操作时,就会触发 IdleStateEvent事件

2)需求描述:

  1. 编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲;
  2. 当服务器超过5秒没有写操作时,就提示写空闲;
  3.  实现当服务器超过7秒没有读或者写操作时,就提示读写空闲;

【3.2】netty心跳检测代码实现

1)netty心跳检测服务器

/**
 * @Description netty心跳检测服务器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月03日
 */
public class NettyHeartbeatCheckServer66 
    public static void main(String[] args) 
        try 
            new NettyHeartbeatCheckServer66().run();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

    public void run() throws InterruptedException 
        // 创建线程池执行器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try 
            // 服务器启动引导对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception 
                            // 添加处理器
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 1. 添加空闲状态处理器 :
                                // readerIdleTime: 表示多长时间没有读入io事件,就会发送一个心跳检测包,检测是否连接状态
                                // writerIdleTime: 表示多长时间没有写出io事件,就会发送一个心跳检测包,检测是否连接状态
                                // allIdleTime:   表示多长时间没有读入和写出io事件,就会发送一个心跳检测包,检测是否连接状态
                            //  2. 文档说明
                            // Triggers an @link IdleStateEvent  when a @link Channel has not performed
                            //     * read, write, or both operation for a while.
                            // 3. 当 IdleStateEvent 事件触发后, 就会传递给管道的 下一个处理器 去处理
                                  // 通过调用下一个handler的 userEventTriggered 方法,即在该方法中处理IdleStateEvent 事件;
                            pipeline.addLast(new IdleStateHandler(4, 5,  7, TimeUnit.SECONDS));
                            // 添加一个对空闲检测 进一步处理的handler(自定义 )
                            pipeline.addLast(new NettyHeartbeatCheckServerHandler());
                        
                    );
            // 启动服务器,监听端口,阻塞直到启动成功
            ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();
            // 阻塞直到channel关闭
            channelFuture.channel().closeFuture().sync();
         finally 
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        
    

2)netty心跳检测服务器处理器

/**
 * @Description netty心跳检测服务器处理器
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年09月04日
 */
public class NettyHeartbeatCheckServerHandler extends ChannelInboundHandlerAdapter 
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception 
        if (evt instanceof IdleStateEvent) 
            IdleStateEvent event2 = (IdleStateEvent) evt;
            String eventType = ""; // 事件类型
            switch (event2.state()) 
                case READER_IDLE: eventType = "读空闲"; break;
                case WRITER_IDLE: eventType = "写空闲"; break;
                case ALL_IDLE: eventType = "读写空闲"; break;
            
            System.out.println("客户端" + ctx.channel().remoteAddress() + "--超时事件--" + eventType);
            System.out.println("服务器做相应处理");
            // 如果发生空闲,马上关闭通道
//            System.out.println("一旦发生超时事件,则关闭 channel");
//            ctx.channel().close();
        
    

【3.3】运行结果:

1)以 NettyGroupChatClient64 作为客户端连接到 服务器 NettyHeartbeatCheckServer66;

2)打印结果如下:

// 控制台打印结果
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理

网络i/o编程模型17netty框架实现心跳机制(代码片段)

一心跳机制说明IdleStateHandler是netty提供的处理空闲状态的处理器longreaderIdleTime:表示多长时间没有读,就会发送一个心跳,检测包检测是否连接。longwriteIdleTime:表示多长时间没有写,就会发送一个心跳,检测包检测... 查看详情

十.netty入门到超神系列-基于netty群聊系统(代码片段)

前言本章节基于Netty做一个聊天室案例加强Netty的熟练度,案例的效果是服务端可以广播某客户端的消息给所有客户端。每个客户端监听键盘输入来获取消息,然后发送给服务端。服务端服务端一样的需要创建BossGroup和Work... 查看详情

网络i/o编程模型16netty框架实现的群聊系统(代码片段)

一背景描述1.编写一下群聊系统:实现服务器端和客户端之间数据通讯(非阻塞模式)服务端:可以检测用户上线,离线,并实现消息转发功能。客户端:通过channel可以无阻塞发送消息给其他所用用户&... 查看详情

4.基于nio的群聊系统(代码片段)

【README】1.本文总结自B站《netty-尚硅谷》,很不错;2.文末有错误及解决方法;【1】群聊需求1)编写一个NIO群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)2)实现多人群聊;3... 查看详情

利用netty开发webscoketclient(支持wss协议,客户端服务端心跳实现)(代码片段)

这里写目录标题前言题外话webScoketClient实现方式一(jacva_webscoket)webScoketClient工具类简单编写测试webScoketClient实现方式二(netty)客户端初始化配置客户端的handler处理器逻辑http协议连接测试wss协议连接测试附页~... 查看详情

java实现心跳检测长连接(代码片段)

...2、心跳机制实现方式心跳机制有两种实现方式,一种基于TCP自带的心 查看详情

java基于socket实现聊天群聊敏感词汇过滤功能(代码片段)

首先的话,这个代码主要是我很久以前写的,然后当时还有很多地方没有理解,现在再来看看这份代码,实在是觉得丑陋不堪,想改,但是是真的改都不好改了…所以,写代码,规范真的很重要。... 查看详情

rpc----基于netty实现的rpc(代码片段)

基于Netty实现的RPC一、Netty服务端和客户端1、服务端server1.1NettyServer1.2NettyServerHandler2、客户端client2.1NettyClient2.2NettyClientHandler二、自定义协议和编解码器1、协议2、编码器3、解码器4、拆包器5、补充知识5.1TCP粘包拆包问题三、序列... 查看详情

8★☆基于stm32的小区环境检测系统√★☆(代码片段)

8、★☆基于STM32的小区环境检测系统√★☆文章目录8、★☆基于STM32的小区环境检测系统√★☆Introduction引言1、系统概述1.1、设计任务1.2、设计要求2、方案设计与论证2.1、芯片选择方案2.2、系统概述2.3、设计要求2.4、系统总体... 查看详情

8★☆基于stm32的小区环境检测系统√★☆(代码片段)

8、★☆基于STM32的小区环境检测系统√★☆文章目录8、★☆基于STM32的小区环境检测系统√★☆Introduction引言1、系统概述1.1、设计任务1.2、设计要求2、方案设计与论证2.1、芯片选择方案2.2、系统概述2.3、设计要求2.4、系统总体... 查看详情

使用websocket实现消息推送(心跳)(代码片段)

...是否可用的,不一定支持携带数据,可要看具体实现如果非要心跳中带上复杂数据,那这个可作为应用层的一个功能自己去实现。0x 查看详情

基于netty的一个简单的时间服务器的实现(netty学习)(代码片段)

  基本功能:与客户端建立连接后立刻发送当前时间先建立一个时间的类packagetimeExample;importjava.sql.Date;publicclassUnixTimeprivatefinallongvalue;publicUnixTime()this(System.currentTimeMillis()/1000L);publicUnixTime(long 查看详情

利用netty开发webscoketclient(支持wss协议,客户端服务端心跳实现)(代码片段)

这里写目录标题前言题外话webScoketClient实现方式一(jacva_webscoket)webScoketClient工具类简单编写测试webScoketClient实现方式二(netty)客户端初始化配置客户端的handler处理器逻辑http协议连接测试wss协议连接测试附页~... 查看详情

利用netty开发webscoketclient(支持wss协议,客户端服务端心跳实现)(代码片段)

这里写目录标题前言题外话webScoketClient实现方式一(jacva_webscoket)webScoketClient工具类简单编写测试webScoketClient实现方式二(netty)客户端初始化配置客户端的handler处理器逻辑http协议连接测试wss协议连接测试附页~... 查看详情

9.基于netty实现websocket服务器(代码片段)

...自B站《netty-尚硅谷》,很不错;2.本文示例代码基于netty实现WebSocket服务器功能;其中,html作为WebSocket客户端;3.WebSocket协议介绍: 对于WebSocket,它的数据是以帧frame的形式传递的; 可以看到WebS 查看详情

netty中的心跳机制,还有谁不会?(代码片段)

...们知道在TCP长连接或者WebSocket长连接中一般我们都会使用心跳机制–即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。那么心跳机制可以用来做什么呢?我们知道网络的传输是不可靠的,当我们发起一个链... 查看详情

在netty的基础下如何搭建im即时通讯集群

...和条件,所以前段时间,笔者利用业余时间,基于Netty开发了一套基本功能比较完善的IM系统。该系统支持私聊、群聊、会话管理、心跳检测&#x 查看详情

基于netty和springboot实现一个轻量级rpc框架-client篇(代码片段)

前提前置文章:《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》前一篇文章相对简略地介绍了RPC服务端的编写,而这篇博文最要介绍服务端(Client)的实现。RPC调用一般... 查看详情