springboot使用websocket实现服务端推送--集群实现

恒奇恒毅 恒奇恒毅     2023-03-22     339

关键词:

书接上文,本文介绍了一种实现集群管理和消息传送方式。

在集群模式情况下,一般是Nginx反向代理到多台Tomcat或者SLB代理到多台Tomcat的方式,怎么实现给某个人推送消息?比如WebSocket1连接到Tomcat1,但是在Tomcat2需要给WebSocket1发送消息,怎么办?一般的想法是像httpsession的集群处理方式一样,利用一个中间件Redis来保存session即可。但是实际测试才发现,根本不可取,因为WebSocket的session是有状态的,并且无法序列化,在往redis中保存的时候就抛异常了。通过查询资料,发现可以通过Redis的发布订阅模式来实现。其基本原理是:初始化的时候都订阅某个频道,Tomcat只管理连接到我的WebSocket的session,需要给某人发布消息的时候通过Redis发布一个消息,所有订阅了该频道的Tomcat都能接收到该消息,根据此消息找WebSocket,能找到就发送消息,不能找到忽略即可。

具体实现:

1.首先实现WebSocketManager,因为连接到本机的WebSocket还是本机管理,所以继承于MemWebSocketManager实现,只需要实现发送消息相关方法。

/**
 * WebSocket的session无法序列化,所以session还是保存在本地内存中,发送消息这种就走订阅发布模式
 * 1.redis或者mq进行发布订阅,广播->有某个节点能找到此人就发送消息,其他的忽略
 * 2.Nginx进行IP hash 可以使用@link MemWebSocketManager
 * @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class RedisWebSocketManager extends MemWebSocketManager 
    public static final String CHANNEL    = "websocket";
    private static final String COUNT_KEY = "CountKey";
    private StringRedisTemplate stringRedisTemplate;

    public RedisWebSocketManager(StringRedisTemplate stringRedisTemplate) 
        this.stringRedisTemplate = stringRedisTemplate;
    


    @Override
    public void put(String identifier, WebSocket webSocket) 
        super.put(identifier, webSocket);
        //在线数量加1
        countChange(1);
    

    @Override
    public void remove(String identifier) 
        super.remove(identifier);
        //在线数量减1
        countChange(-1);
    

    @Override
    public int size() 
        return getCount();
    

    @Override
    public void sendMessage(String identifier, String message) 
        WebSocket webSocket = get(identifier);
        //本地能找到就直接发
        if(null != webSocket && WebSocket.STATUS_AVAILABLE == webSocket.getStatus())
            WebSocketUtil.sendMessage(webSocket.getSession() , message);
            return;
        


        Map<String , Object> map = new HashMap<>(3);
        map.put(RedisReceiver.ACTION , ActionFactory.ACTION_SEND_MESSAGE);
        map.put(RedisReceiver.IDENTIFIER , identifier);
        map.put("message" , message);
        //在websocket频道上发布发送消息的消息
        stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map));
    

    @Override
    public void broadcast(String message) 
        Map<String , Object> map = new HashMap<>(2);
        map.put(RedisReceiver.ACTION , ActionFactory.ACTION_BROADCAST_MESSAGE);
        map.put("message" , message);
        //在websocket频道上发布广播的消息
        stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map));
    

    @Override
    public void changeStatus(String identifier, int status) 
        WebSocket webSocket = get(identifier);
        if(null != webSocket)
            webSocket.setStatus(status);
            return;
        
        Map<String , Object> map = new HashMap<>(3);
        map.put(RedisReceiver.ACTION , ActionFactory.ACTION_CHANGE_STATUS);
        map.put(RedisReceiver.IDENTIFIER , identifier);
        map.put("status" , status);
        //在websocket频道上发布改变状态的消息
        stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map));
    

    /**
     * 增减在线数量
     */
    private void countChange(int delta)
        ValueOperations<String, String> value = stringRedisTemplate.opsForValue();

        //获取在线当前数量
        int count = getCount(value);

        count = count + delta;
        count = count > 0 ? count : 0;

        //设置新的数量
        value.set(COUNT_KEY , "" + count);
    

    /**
     * 获取当前在线数量
     */
    private int getCount()
        ValueOperations<String, String> value = stringRedisTemplate.opsForValue();
        return getCount(value);
    
    private int getCount(ValueOperations<String, String> value) 
        String countStr = value.get(COUNT_KEY);
        int count = 0;
        if(null != countStr)
            count = Integer.parseInt(countStr);
        
        return count;
    

该类中,重写了所有需要操作某个session的方法,在这些方法中指定不同的操作Action及带上相应的数据。

2.订阅者收到改消息及能做出不同的动作,订阅者持有WebSocketManager,可以操作相关的session。

/**
 * redis消息订阅者
 * @author xiongshiyan
 */
public class RedisReceiver 
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisReceiver.class);
    public static final String IDENTIFIER = "identifier";
    public static final String ACTION     = "action";

    private CountDownLatch latch;
    private WebSocketManager webSocketManager;

    public RedisReceiver(WebSocketManager webSocketManager, CountDownLatch latch) 
        this.webSocketManager = webSocketManager;
        this.latch = latch;
    

    /**
     * 此方法会被反射调用
     */
    public void receiveMessage(String message) 
        LOGGER.info(message);

        JSONObject object = new JSONObject(message);
        if(!object.containsKey(ACTION))
            return;
        
        String actionString = object.getString(ACTION);
        Action action = ActionFactory.create(actionString);
        action.doMessage(this.webSocketManager , object);

        //接收到消息要做的事情
        latch.countDown();
    

消息订阅者接收到消息之后,根据Action得到操作类,实现不同的操作,如果以后有更多的功能,那么添加相应的Action类即可。

/**
 * @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class ActionFactory 
    public static final String ACTION_SEND_MESSAGE       = "sendMessage";
    public static final String ACTION_CHANGE_STATUS      = "changeStatus";
    public static final String ACTION_BROADCAST_MESSAGE  = "broadcast";
    public static Action create(String action)
        if(ACTION_SEND_MESSAGE.equalsIgnoreCase(action))
            return new SendMessageAction();
        else if(ACTION_CHANGE_STATUS.equalsIgnoreCase(action))
            return new ChangeStatusAction();
        else if(ACTION_BROADCAST_MESSAGE.equalsIgnoreCase(action))
            return new BroadCastAction();
        else 
            return new NoActionAction();
        
    
/**
 * 
 *     "action":"sendMessage",
 *     "identifier":"xxx",
 *     "message":"xxxxxxxxxxx"
 * 
 * 给webSocket发送消息的action
 * @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class SendMessageAction implements Action
    private static final String MESSAGE = "message";
    @Override
    public void doMessage(WebSocketManager manager, JSONObject object) 
        if(!object.containsKey(RedisReceiver.IDENTIFIER))
            return;
        
        if(!object.containsKey(MESSAGE))
            return;
        

        String identifier = object.getString(RedisReceiver.IDENTIFIER);

        WebSocket webSocket = manager.get(identifier);
        if(null == webSocket || WebSocket.STATUS_AVAILABLE != webSocket.getStatus())
            return;
        
        WebSocketUtil.sendMessage(webSocket.getSession() , object.getString(MESSAGE));
    
/**
 * 
 *     "action":"changeStatus",
 *     "identifier":"xxx",
 *     "status":1
 * 
 * 改变状态的action
 * @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class ChangeStatusAction implements Action
    private static final String STATUS = "status";
    @Override
    public void doMessage(WebSocketManager manager , JSONObject object) 
        if(!object.containsKey(RedisReceiver.IDENTIFIER))
            return;
        
        if(!object.containsKey(STATUS))
            return;
        
        WebSocket webSocket = manager.get(object.getString(RedisReceiver.IDENTIFIER));
        if(null == webSocket)
            return;
        
        webSocket.setStatus(object.getInteger(STATUS));
    

/**
 * 
 *     "action":"broadcast",
 *     "message":"xxxxxxxxxxxxx"
 * 
 * 广播给所有的websocket发送消息 action
 * @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class BroadCastAction implements Action
    @Override
    public void doMessage(WebSocketManager manager, JSONObject object) 
        String message = object.getString("message");
        //从本地取出所有的websocket发送消息
        manager.localWebSocketMap().values().forEach(
                webSocket -> WebSocketUtil.sendMessage(
                        webSocket.getSession() , message));
    

/**
 * do nothing action
 * @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
 */
public class NoActionAction implements Action
    @Override
    public void doMessage(WebSocketManager manager, JSONObject object) 
        // do no thing
    

3.配置WebSocketManager和消息订阅。

@Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) 
        return new StringRedisTemplate(connectionFactory);
    
    /**
     * 使用redis管理,具备集群功能
     */
    @Bean(WebSocketManager.WEBSOCKET_MANAGER_NAME)
    public RedisWebSocketManager webSocketManager(@Autowired StringRedisTemplate stringRedisTemplate)
        return new RedisWebSocketManager(stringRedisTemplate);
    
**
 * @author xiongshiyan
 * redis管理websocket配置
 */
@Configuration
@ConditionalOnBean(RedisWebSocketManager.class)
public class RedisWebSocketConfig 
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) 

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(RedisWebSocketManager.CHANNEL));

        return container;
    

    @Bean
    public RedisReceiver receiver(
            @Autowired@Qualifier("webSocketManager") WebSocketManager webSocketManager,
            @Autowired@Qualifier("latch") CountDownLatch latch) 
        return new RedisReceiver(webSocketManager , latch);
    

    @Bean
    public MessageListenerAdapter listenerAdapter(RedisReceiver receiver) 
        return new MessageListenerAdapter(receiver, "receiveMessage");
    

    @Bean
    public CountDownLatch latch() 
        return new CountDownLatch(1);
    

    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) 
        return new StringRedisTemplate(connectionFactory);
    

至此,实现了WebSocket的集群管理。

 

项目参见 https://gitee.com/xxssyyyyssxx/websocket-springboot-starter 目前支持多ServerEndPoint和多WebSocketManager,一般情况下他们的关系是一对一的,便于管理。

使用demo见 https://gitee.com/xxssyyyyssxx/websocket-demo

springboot实现websocket(代码片段)

...一个websocket应该怎么实现,这里采用的是更加方便的Springboot的方式,如果项目中没有使用springboot框架,也是可以 查看详情

springboot实现websocket(代码片段)

...一个websocket应该怎么实现,这里采用的是更加方便的Springboot的方式,如果项目中没有使用springboot框架,也是可以 查看详情

springboot实现websocket(代码片段)

...一个websocket应该怎么实现,这里采用的是更加方便的Springboot的方式,如果项目中没有使用springboot框架,也是可以 查看详情

springboot集成websocket,轻松实现信息推送!

在一次项目开发中,使用到了Netty网络应用框架,以及MQTT进行消息数据的收发,这其中需要后台来将获取到的消息主动推送给前端,于是就使用到了MQTT,特此记录一下。一、什么是websocket?WebSocket协议是基于TCP的一种新的网络... 查看详情

springboot+websocket学习(代码片段)

Springboot+WebSocket聊天室项目WebSocket介绍WebSocket的特点webSocket协议客户端(浏览器)实现websocket对象websocket事件WebSocket方法服务端实现服务端如何接受客户端发送过来的数据呢?服务端如何推送数据给客户端呢?基于WebSocket的网页聊... 查看详情

使用springboot+layim+websocket实现webim

          使用springboot+layim+websocket实现webim 小白技术社 项目介绍采用springboot和layim构建webim,使用websocket作为通讯协议,目前已经能够正常聊天,并没有对好友的操作进行实现,查找和加好友没有实现,有需... 查看详情

springboot中使用websocket实现一对多聊天及一对一聊天(代码片段)

为什么需要WebSocket?我们已经有了http协议,为什么还需要另外一个协议?有什么好处?比如我想得到价格变化,只能是客户端想服务端发起请求,服务器返回结果,HTTP协议做不到服务器主动向客户端推送消息,这种单向请求的... 查看详情

springboot-websocket

...WebSocket的方式,原因:Spring使用WebSocket简便且易于扩展。SpringBoot使用WebSocket非常方便,依赖上仅需要添加相应的Starter即可。先给出概要的开发步骤:其实到这里,基础的websocket服务已经搭建好了,剩下的可以自己在handler与interc... 查看详情

springboot配置websocket

参考技术A既然是一个长连接,那么对于比较时效性(如聊天)或者需要推送的场景就可以使用WebSocket来实现,服务端不再是等待客户端的请求而可以主动推送消息给客户端。同时也减少了资源的开销,因为之前通过HTTP的做法通... 查看详情

springboot+websocket实时消息推送

...术A商家的后台管理系统实现新订单提醒推送功能,利用SpringBoot+WebSocket实时消息推送的方式进行实现。引入依赖,我使用的是SpringBoot版本2.2.6.RELEASE,自动管理依赖版本配置类WebSocketConfig,扫描并注册带有@ServerEndpoint注解的所有w... 查看详情

springboot使用websocket实现服务端推送--集群实现

书接上文,本文介绍了一种实现集群管理和消息传送方式。在集群模式情况下,一般是Nginx反向代理到多台Tomcat或者SLB代理到多台Tomcat的方式,怎么实现给某个人推送消息?比如WebSocket1连接到Tomcat1,但是在Tomc... 查看详情

springboot+vue+websocket实现服务器端向客户端主动发送消息

参考技术A本文通过一个实际的场景来介绍在前后端分离的项目中通过WebSocket来实现服务器端主动向客户端发送消息的应用。主要内容如下Websocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket连接成功后,服务端与客户... 查看详情

springboot-websocket实现及原理

本文章包括websocket面试相关问题以及springboot如何整合webSocket。参考文档https://blog.csdn.net/prayallforyou/article/details/53737901、https://www.cnblogs.com/bianzy/p/5822426.html  webSocket是HTML5的一种新协议,它实现了服务端与客户端的全双工通信,... 查看详情

springboot整合websocket实现即时聊天功能

...期,公司需要新增即时聊天的业务,于是用websocket整合到Springboot完成业务的实现。一、我们来简单的介绍下websocket的交互原理:1.客户端先服务端发起websocket请求;2.服务端接收到请求之后,把请求响应返回给客户端;3.客户端... 查看详情

springboot集成websocket,实现后台向前端推送信息(代码片段)

前言在一次项目开发中,使用到了Netty网络应用框架,以及MQTT进行消息数据的收发,这其中需要后台来将获取到的消息主动推送给前端,于是就使用到了MQTT,特此记录一下。一、什么是websocket?WebSocket协... 查看详情

springboot集成websocket,实现后台向前端推送信息(代码片段)

前言在一次项目开发中,使用到了Netty网络应用框架,以及MQTT进行消息数据的收发,这其中需要后台来将获取到的消息主动推送给前端,于是就使用到了MQTT,特此记录一下。一、什么是websocket?WebSocket协... 查看详情

springboot+netty+websocket实现消息推送(代码片段)

关于NettyNetty是一个利用Java的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的API的客户端/服务器框架。Maven依赖<dependencies> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency>  <groupId&g 查看详情

springboot怎么实现websocket通信

前言上一篇文章分享了单机模式下,websocket的基本使用方法,但在实际的业务中,通常是不会这样使用的,大部项目都是分布式部署的,一个工程布署了多个服务节点,前端并不直接请求具体服务节点࿰... 查看详情