消息总线重构之eventbus(代码片段)

vinoYang vinoYang     2023-02-10     777

关键词:

本人博客文章如未特别注明皆为原创!如有转载请注明出处:http://blog.csdn.net/yanghua_kobe/article/details/46699205

最近花了不少时间对消息总线进行了重构。重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景:

(1)改进广播通知

(2)业务逻辑串联,用事件驱动替代责任链模式

EventBus简介

EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读。总得来说,EventBus是观察者模型的实现,利用它你既可以实现观察者模型的业务场景,还可以基于它的事件驱动机制来实现应用程序内组件之间的解耦与通信。

改进广播通知

广播通知是消息总线提供的功能之一。在重构之前,客户端接收广播通知是通过消息总线客户端SDK的一个API来实现的:

public void setNotificationListener(IMessageReceiveListener notificationListener);

但之前的广播通知设计并不合理。它受限于之前的基于RabbitMQ的树形路由拓扑模型:


这个拓扑结构中有些只发送不接受的“虚拟队列”并不是真实存在的队列。这些消息生产者无法接收消息,这是非常大的一个缺陷。我一直在想办法重新设计它,之前的关注点都集中在RabbitMQ上,想在MQ上找到一种解决方案,但这很难,除非摈弃“虚拟队列”的设计。于是,我将关注点转移到消息总线中另一个可以提供pub/sub的组件上(后称之为pubsuber),该组件目前可以是redis也可以是zookeeper。因为每个client(更准确得说是每个创建client的pool)都会以长连接的方式挂在pubsuber上。所以,它本身就是一个很不错的广播渠道,并且因为它脱离RabbitMQ单独实现,跟虚拟队列的设计不相冲突。

上面的思路没有问题,但语义与实现上并不对等。通知的收发从语义上来说应该是Client API级别的。而PubSuber接收到的广播事件却是Pool级别的,并不依赖client(Pool创建PubSuber以及Client)。我们不应该在Pool层面上接收广播事件。因此这里存在一个事件的截获与二次转发的过程。这是我们针对EventBus的第一个应用场景:用它转发PubSuber接收到的广播通知给client。

PubSuber接收到广播消息之后通过EventBus 作二次转发:

    public class NotifyHandler implements IPubSubListener 

        @Override
        public void onChange(String channel, byte[] data, Map<String, Object> params) 
            NotifyEvent notifyEvent = new NotifyEvent();
            Message broadcastMsg = pubsuberManager.deserialize(data, Message.class);
            if (broadcastMsg != null && broadcastMsg.getMessageType().equals(MessageType.BroadcastMessage)) 
                notifyEvent.setMsg(broadcastMsg);
                getComponentEventBus().post(notifyEvent);
            
        
    

事件发布完了之后,EventBus会将其分发到该事件的订阅者处理,这里需要注意的是创建的EventBus是一个异步EventBus的实例,它在一个独立的线程上执行事件处理器方法。而所有的事件处理器都需要通过Client进行注册:

    public void registerEventProcessor(Object eventProcessor) 
        componentEventBus.register(eventProcessor);
    

以上这一步,就将消息通知跟Client关联起来。而且对多个client注册不同的事件处理器,还可以起到多播的作用(原来在Pool级别是一个事件,现在在Client级别,多个Client可以应对若干个处理器)。

EventBus通过注解来解析事件处理器与事件之间的关联关系,更多的实现细节,请参考之前的文章。下面就是订阅广播通知的方式:

    public static class NotificationEventProcessor 

        @Subscribe
        public void onNotification(NotifyEvent event) 
            logger.info("onNotification");
            Message message = event.getMsg();
            assertNotNull(message);
            assertEquals("test", new String(message.getContent(), Constants.CHARSET_OF_UTF8));
        

    

仅仅需要一个注解即可。当然最后别忘记移除注册,如果你不再希望接收通知的话,整个过程如下:

    public void testBroadcast() throws Exception 
        String secret = "kljasdoifqoikjhhhqwhebasdfasdf";

        Message msg = MessageFactory.createMessage(MessageType.BroadcastMessage);
        msg.setContentType("text/plain");
        msg.setContentEncoding("utf-8");

        msg.setContent("test".getBytes(Constants.CHARSET_OF_UTF8));

        NotificationEventProcessor eventProcessor = new NotificationEventProcessor();
        client.registerEventProcessor(eventProcessor);

        client.broadcast(secret, new Message[]msg);

        TimeUnit.SECONDS.sleep(10);

        client.unregisterEventProcessor(eventProcessor);
    

这样,原先的拓扑结构就不再包含广播通知的实现了:


事件驱动替代责任链模式

客户端跟消息总线的一次通信,需要经历多个业务逻辑环节。这些业务逻辑有些有顺序关系,有些没有。我们希望将逻辑进行拆分、自由组合搭配并且能够互不干扰得扩展。在此之前的实现基于责任链模式,有一点问题:当长连接消费时,因为真正的消费通常是chain的最后一个调用(方式是:阻塞,一直等到超过设定的时间),所以整个递归链都阻在最后一个调用。而递归调用的实现是基于栈,因此如果最后一个调用不返回(很多时候这种长连接的生命周期跟应用的生命周期相同),整个调用链以及调用中的局部变量一直都不被释放,某种程度上这有点像内存泄露了。这个问题,我曾在之前的文章中探讨过,但一直没找到太好的解决方法,除非我们放弃使用责任链模式。

但基于同步事件驱动的方式似乎能起到跟责任链模式一样的效果。它通过事件分发来驱动业务逻辑调用。将chain的每一个调用都看做是一个事件处理方法,一个单向通信逻辑(比如produce)对应一个事件处理器(produceEventProcessor)。因为此处的EventBus是同步的(事件处理逻辑在调用线程上执行,执行顺序跟事件发生的顺序相同),所以只要编排好事件顺序,一一触发事件,事件处理器也就会一一按照事件触发的顺序执行。

我们以消息生产者来看一下通过EventBus改造后的业务逻辑是什么样子。

首先我们定义一个生产消息的事件处理器:

public class ProduceEventProcessor extends CommonEventProcessor 


为了使得逻辑关系紧凑,我们将事件以内部类的方式定义在生产消息的事件处理器内部:

//region events definition
    public static class ValidateEvent extends CarryEvent 
    


    public static class PermissionCheckEvent extends CarryEvent 
    

    public static class ProduceEvent extends CarryEvent 
    
    //endregion

定义每个事件的事件处理方法:

@Subscribe
public void onValidate(ValidateEvent event) 


@Subscribe
public void onPermissionCheckEvent(PermissionCheckEvent event) 


@Subscribe
public void onProduce(ProduceEvent event) 


在client被调用以生产消息时,首先创建该事件处理器的实例,然后向EventBus注册事件处理器:

        EventBus carryEventBus = this.getContext().getCarryEventBus();

        //register event processor
        ProduceEventProcessor eventProcessor = new ProduceEventProcessor();
        carryEventBus.register(eventProcessor);

只有注册了该实例,在发布事件时,才会触发该实例的事件处理方法。注册完成该实例之后,需要初始化事件对象,这里事件之间以及事件处理器之间没有必然联系,我们以一个消息上下文对象的引用来让它们以共享“内存”的方式进行数据交换:

        //init events
        ProduceEventProcessor.ValidateEvent validateEvent = new ProduceEventProcessor.ValidateEvent();
        ProduceEventProcessor.MsgBodySizeCheckEvent msgBodySizeCheckEvent = new ProduceEventProcessor.MsgBodySizeCheckEvent();
        ProduceEventProcessor.PermissionCheckEvent permissionCheckEvent = new ProduceEventProcessor.PermissionCheckEvent();
        ProduceEventProcessor.MsgIdGenerateEvent msgIdGenerateEvent = new ProduceEventProcessor.MsgIdGenerateEvent();
        ProduceEventProcessor.MsgBodyCompressEvent msgBodyCompressEvent = new ProduceEventProcessor.MsgBodyCompressEvent();
        ProduceEventProcessor.ProduceEvent produceEvent = new ProduceEventProcessor.ProduceEvent();

        validateEvent.setMessageContext(ctx);
        msgBodySizeCheckEvent.setMessageContext(ctx);
        permissionCheckEvent.setMessageContext(ctx);
        msgIdGenerateEvent.setMessageContext(ctx);
        msgBodyCompressEvent.setMessageContext(ctx);
        produceEvent.setMessageContext(ctx);

准备工作就绪,现在开始发布事件。这里事件的发布顺序跟执行顺序是一致的,所以我们需要根据业务逻辑来编排事件,以形成原先的串联调用的效果:

//arrange event order and emit!
carryEventBus.post(validateEvent);
carryEventBus.post(msgBodySizeCheckEvent);
carryEventBus.post(permissionCheckEvent);
carryEventBus.post(msgIdGenerateEvent);
carryEventBus.post(msgBodyCompressEvent);
carryEventBus.post(produceEvent);

这就是重构的整个过程。我们发现这里不再存在链式(递归)调用了,各个事件处理器方法之间也没有耦合性,它们通过MessageContext来共享上下文。如果我们要增加新的业务逻辑,如何扩展?四步走:

(1)定义一个新事件对象

(2)定义一个新的事件处理器方法

(3)实例化该事件对象

(4)根据需要插入原先的编排过的事件中去并发布该事件

跟原先的事件没有任何关系。

更多实现,可以查看项目源码:banyan

googleguava的消息总线:eventbus,java(代码片段)

googleguava的消息总线:EventBus,Javapom.xml添加引用:<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</ve 查看详情

googleguava的消息总线:eventbus,java(代码片段)

googleguava的消息总线:EventBus,Javapom.xml添加引用:<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</ve 查看详情

eventbus3.0使用与源码分析(代码片段)

EventBus简介EventBusisapublish/subscribeeventbusoptimizedforAndroid.EventBus是一个基于发布/订阅模式的事件总线。其模型图如下从图可知,EventBus分为四个角色,消息发布者、事件总线、事件、消息订阅者。消息发布者把Event(消息)post(... 查看详情

eventbus应用(代码片段)

EventBus作为消息总线,通过解耦发布者和订阅者简化事件传递本文实现一个handler演示如何使用EventBus定义消息类型对象EventMsgpackagecom.binfoo.observer.event;/***定义消息类型对象*/publicclassEventMsgprivateStringmsg;publicEventMsg()publicEventMsg(Stringm... 查看详情

shashlik.eventbus.net事件总线,分布式事务最终一致性简介(代码片段)

...关键字,在此不会过多阐述相关的理论知识。Shashlik.EventBus就是一个基于.NET6的开源事件总线解决方案,同时也是分布式事务最终一致性、延迟事件解决方案。Shashlik.EventBus采用的是异步确保的思路(本地消息表)&... 查看详情

android消息传递之基于rxjava实现一个eventbus-rxbus

...:  上篇文章学习了Android事件总线管理开源框架EventBus,EventBus的出现大大降低了开发成本以及开发难度,今天我们就利用目前大红大紫的RxJava来实现一下类似EventBus事件总线管理,现在很多人都在说用这种方式来替代Event... 查看详情

eventbus3.0使用与源码分析(代码片段)

EventBus简介EventBusisapublish/subscribeeventbusoptimizedforAndroid.EventBus是一个基于发布/订阅模式的事件总线。其模型图如下从图可知,EventBus分为四个角色,消息发布者、事件总线、事件、消息订阅者。消息发布者把Event(消息)post(... 查看详情

eventbus事件总线框架(发布者/订阅者模式,观察者模式)(代码片段)

...,广播方式。二、单例比较好的写法: privatestaticvolatileEventBusdefaultInstance; 构造函数应当是private,不应该是public1pub 查看详情

eventbuseventbus事件总线框架简介(eventbus使用流程)(代码片段)

文章目录一、EventBus事件总线框架简介二、EventBus使用流程一、EventBus事件总线框架简介Android中的事件传递机制:使用Intent在组件间传递信息;使用BroadcastReceiver跨进程传递数据;使用Handler跨线程通信;使用接口回调机制,Activity与Fragmen... 查看详情

手动实现事件总线框架eventbus(代码片段)

...)super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);EventBus.register(this);@OverrideprotectedvoidonDestroy()super.onDestroy();EventBus.unRegister(this);发送、接收EventBus.post(newBtn3EventBean("msg:按钮3的消息"));@Subscribepublicvoidbtn3Event1(Btn3Ev... 查看详情

自己动手写事件总线(eventbus)(代码片段)

本文由云+社区发表事件总线核心逻辑的实现。EventBus的作用Android中存在各种通信场景,如Activity之间的跳转,Activity与Fragment以及其他组件之间的交互,以及在某个耗时操作(如请求网络)之后的callback回调等,互相之之间往往需... 查看详情

重复造轮子之livedatabus与pageeventbus(代码片段)

背景Android的消息总线框架近几年流行莫过于的EventBus,RxBus,一般来讲它已经足够好用,简洁、解耦,我们能够很方便的进行消息传递,那为什么我们现在又要再造一个消息总线的框架的轮子呢?这就要说... 查看详情

abp之事件总线(代码片段)

...为什么要使用IOC?IOC是用来代替反射的。那么反射在我们EventBus中有什么功能?反射是用来创建 查看详情

eshoponcontainers学习系列:rabbitmq消息总线实践(代码片段)

...实现发布订阅。新建API项目 RabbitMQ_Bus_Test,类库 EventBus、EventBusRabbitMQ,这两个类库中将会实现消息总线最主 查看详情

消息总线扩展之集成thrift-rpc(代码片段)

本文主要探讨了消息总线支持ThriftRPC的实现过程。鉴于RabbitMQ官方的JavaClient提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API。然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时... 查看详情

eventbus简单使用(代码片段)

EventBus是一个开源的事件总线框架,可方便的实现事件消息的收发,可方便的替代handler,asyc等,实现程序的解耦合。EventBusgithub地址:https://github.com/greenrobot/EventBusEventbus官方网址:http://greenrobot.org/eventbus/Ev 查看详情

eventbus/eventqueue再思考(代码片段)

EventBus/EventQueue再思考Intro之前写过两篇文章,造轮子系列的EventBus/EventQueue,回想起来觉得当前的想法有点问题,当时对EvenStore可能有点误解,有兴趣可以参考https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html/https://www.cnblogs.com... 查看详情

事件总线功能库,reface.eventbus详细使用教程(代码片段)

Reface.AppStarter中的事件总线功能是通过Reface.EventBus提供的。参考文章:Reface.AppStarter框架初探使用Reface.EventBus,你可以在Reface.AppStarter框架外使用事件总线的功能。Reface.EventBus提供了两个版本的包Reface.EventBus、Reface.Core.EventBus,分别... 查看详情