.netcorewith微服务-使用agiledt快速实现基于可靠消息的分布式事务

dotNET跨平台 dotNET跨平台     2023-01-15     287

关键词:

前面对于分布式事务也讲了好几篇了(可靠消息最终一致性分布式事务 - TCC分布式事务 - 2PC、3PC
https://github.com/kklldog/AgileDT 开源不易,大家多多 ✨✨✨

回顾

前面一篇文章(可靠消息最终一致性 )我们详细介绍了基于可靠消息的分布式事务。为了更好的理解 AgileDT 的代码,我们还是有必要简单的来回顾下。

该方案总体流程上可分为以下步骤:

  1. 主动方在真正的业务开始前先向可靠消息服务发送一个“待确认”的消息

  2. 可靠消息服务收到待确认消息后持久化消息到数据库

  3. 如果以上操作成功则主动方开始真正的业务,如果失败则直接放弃执行业务

  4. 如果业务执行成功则发送“确认”消息给可靠消息服务,如果执行失败则发送“取消”给可靠消息服务。

  5. 如果可靠消息服务收到“确认”消息则更新数据库里的消息记录的状态为“待发送”,如果收到的消息为“取消”则更新消息状态为“已取消”

  6. 如果上一步更新的数据库为“待发送”,那么会开始往MQ投递消息,并且更改数据库里的消息记录的状态为“已发送”

  7. 上一步往MQ投递消息成功后,MQ会给被动方推送消息。

  8. 被动方收到消息后开始处理业务

  9. 如果业务处理成功,则被动方对MQ进行ACK回复,则这条消息会从MQ内移除掉

  10. 如果业务处理成功,则发送“已完成”消息给可靠消息服务

  11. 可靠消息服务收到“已完成”消息后更新数据库消息记录未“已完成”

废话不多说了,下面让我们演示下如何使用 AgileDT 来快速实现一个基于可靠消息的分布式事务。
以下我们还是以经典的订单下单完成给会员赠送积分的场景来演示。

使用 AgileDT

依赖组件

  • mysql

  • rabbitmq

目前支持 mysql 数据库,但是数据访问组件使用的是 freesql 所以后续要实现支持别的数据库也很简单。目前框架使用的可靠消息服务为 rabbitmq 。

运行服务端

在服务新建一个数据库并且新建一张表

// crate event_message table on mysql
create table if not exists event_message
(
 event_id varchar(36) not null
  primary key,
 biz_msg varchar(4000) null,
 status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
 create_time datetime(3) null,
 event_name varchar(255) null
);

使用docker-compose运行服务端

version: "3"  # optional since v1.27.0
services:
  agile_dt:
    image: "kklldog/agile_dt"
    ports:
      - "5000:5000"
    environment:
      - db:provider=mysql
      - db:conn= Database=agile_dt;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306
      - mq:userName=admin
      - mq:password=123456
      - mq:host=192.168.0.115
      - mq:port=5672

安装客户端

在主动方跟被动方都需要安装AgileDT的客户端库

Install-Package AgileDT.Client

主动方使用方法

  1. 在业务数据库添加事务消息表

// crate event_message table on mysql
create table if not exists event_message
(
 event_id varchar(36) not null
  primary key,
 biz_msg varchar(4000) null,
 status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
 create_time datetime(3) null,
 event_name varchar(255) null
);
  1. 修改配置文件

在appsettings.json文件添加以下节点:
  "agiledt": 
    "server": "http://localhost:5000",
    "db": 
      "provider": "mysql",
      "conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
      //"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
    ,
    "mq": 
      "host": "192.168.0.125",
      //"host": "192.168.0.115",
      "userName": "admin",
      "password": "123456",
      "port": 5672
    
  
  1. 注入 AgileDT 客户端服务

public void ConfigureServices(IServiceCollection services)
        
            services.AddAgileDT();
            ...
        
  1. 实现IEventService方法
    处理主动方业务逻辑的类需要实现IEventService接口,并且标记那个方法是真正的业务方法。AgileDT在启动的时候会扫描这些类型,并且使用AOP技术生成代理类,在业务方法前后插入对应的逻辑来跟可靠消息服务通讯。这里要注意的几个地方:

  • 实现IEventService接口

  • 使用DtEventBizMethod注解标记业务入口方法

  • 使用DtEventName注解来标记事务的方法名称,如果不标记则使用类名

注意:业务方法最终一定要使用事务来同步修改消息表的status字段为done状态,这个操作框架没办法帮你实现
注意:业务方法如果失败请抛出Exception,如果不抛异常框架一律认为执行成功

public interface IAddOrderService:IEventService
    
        bool AddOrder(Order order);
    

    [DtEventName("orderservice.order_added")]
    public class AddOrderService : IAddOrderService
    
        private readonly ILogger<AddOrderService> _logger;

        public AddOrderService(ILogger<AddOrderService> logger)
        
            _logger = logger;
        

        public string EventId  
            get;
            set;
        
        
        [DtEventBizMethod]
        public virtual bool AddOrder(Order order)
        
            var ret = false;

            //3. 写 Order 跟 修改 event 的状态必选写在同一个事务内
            FreeSQL.Instance.Ado.Transaction(() =>
            
                order.EventId = EventId;//在订单表新增一个eventid字段,使order跟event_message表关联起来
                var ret0 = FreeSQL.Instance.Insert(order).ExecuteAffrows();
                var ret1 = FreeSQL.Instance.Update<OrderService.Data.entities.EventMessage>()
                .Set(x => x.Status, MessageStatus.Done)
                .Where(x => x.EventId == EventId)
                .ExecuteAffrows();

                ret = ret0 > 0 && ret1 > 0;
            );

            return ret;

        

        /// <summary>
        /// 构造后续业务处理需要的消息内容
        /// </summary>
        /// <returns></returns>
        public string GetBizMsg()
        
            //这里可以构造传递到MQ的业务消息的内容,比如传递订单编号啊 ,以便后续的被动方处理业务时候使用
            var order = FreeSQL.Instance.Select<Order>().Where(x => x.EventId == EventId).First();
            return order?.Id;
        
      
    

在实现好 IAddOrderService 接口后,你可以像平常一样使用 IAddOrderService 来注入实现类。比如在 Controller 的构造函数注入进去。因为 AgileDT 在启动的时候会自动帮你注册。

注意:IAddOrderService 跟实现类的生命周期是 Scoped 。

被动方使用方法

  1. 在业务方数据库建表或者在业务表上加字段
    对于被动方来说这里不是必须要建一个表。但是至少要有个地方来存储event_id的信息,最简单的是直接在业务主表上加event_id字段。

  2. 修改配置文件

在appsettings.json文件添加以下节点:
  "agiledt": 
    "server": "http://localhost:5000",
    "db": 
      "provider": "mysql",
      "conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
      //"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
    ,
    "mq": 
      "host": "192.168.0.125",
      //"host": "192.168.0.115",
      "userName": "admin",
      "password": "123456",
      "port": 5672
    
  
  1. 注入AgileDT服务

public void ConfigureServices(IServiceCollection services)
        
            services.AddAgileDT();
            ...
        
  1. 实现IEventMessageHandler接口
    被动方需要接收MQ投递过来的消息,这些处理类需要实现IEventMessageHandler接口。AgileDT启动的时候会去扫描这些类,然后跟MQ建立绑定关系。

  • 这里必须使用DtEventName注解标记需要处理的事件名称

  • Reveive 方法必须是幂等的

public interface IOrderAddedMessageHandler: IEventMessageHandler
    
    
    
    [DtEventName("orderservice.order_added")]
    public class OrderAddedMessageHandler: IOrderAddedMessageHandler
    
        static object _lock = new object();

        public bool Receive(EventMessage message)
        
            var bizMsg = message.BizMsg;
            var eventId = message.EventId;
            string orderId = bizMsg;

            lock (_lock)
            
                var entity = FreeSQL.Instance.Select<PointHistory>().Where(x => x.EventId == eventId).First();
                if (entity == null)
                
                    var ret = FreeSQL.Instance.Insert(new PointHistory
                    
                        Id = Guid.NewGuid().ToString(),
                        EventId = message.EventId,
                        OrderId = orderId,
                        Points = 20,
                        CreateTime = DateTime.Now
                    ).ExecuteAffrows();

                    return ret > 0;
                
                else
                
                    return true;
                
            
        
    

总结

通过以上演示,我们快速的实现了一个订单下单会员赠送积分的服务。可以看到使用 AgileDT 可以很快速的实现一个分布式事务。特别是在实现过一个分布式事务后,后面实现起来就特别简单,只要实现几个接口就可以了。AgileDT 才刚刚起步,希望大家多多支持,多多✨✨✨  ,多多 PR 。

https://github.com/kklldog/AgileDT

.Net Core with 微服务 - 可靠消息最终一致性分布式事务

.Net Core with 微服务 - 分布式事务 - TCC

.Net Core with 微服务 - 分布式事务 - 2PC、3PC

.netcorewith微服务(代码片段)

上一次我们介绍了Ocelot网关的基本用法。这次我们开始介绍服务注册发现组件Consul的简单使用方法。服务注册发现首先先让我们回顾下服务注册发现的概念。在实施微服务之后,我们的调用都变成了服务间的调用。服务间调用需... 查看详情

.netcorewith微服务(代码片段)

上一次我们介绍并演示了如果使用Consul做为我们微服务的注册中心,来实现服务的注册与发现。那么本次我们讲会演示如何做日志聚合。日志聚合比较常用的有ELK等,但是这次我想要介绍的是一款比较小众的日志聚合工具-Seq。... 查看详情

.netcorewith微服务(代码片段)

上一次我们介绍了Seq日志聚合组件。这次要给大家介绍的是ElasticAPM,一款应用程序性能监控组件。APM监控围绕对应用、服务、容器的健康监控,对接口的调用链、性能进行监控。在我们实施微服务后,由于复杂的业务逻辑,服... 查看详情

.netcorewith微服务-分布式事务-2pc3pc

最近比较忙,好久没更新了。这次我们来聊一聊分布式事务。在微服务体系下,我们的应用被分割成多个服务,每个服务都配置一个数据库。如果我们的服务划分的不够完美,那么为了完成业务会出现非常多的跨... 查看详情

.netcorewith微服务-polly服务降级熔断

在我们实施微服务之后,服务间的调用变的异常频繁。多个服务之间可能是互相依赖的关系。某个服务出现故障或者是服务间的网络出现故障都会造成服务调用的失败,进而影响到某个业务服务处理失败。某一个服务调... 查看详情

.netcorewith微服务-分布式事务-tcc

上一次我们讲解了分布式事务的2PC、3PC。那么这次我们来理一下TCC事务。本次还是讲解TCC的原理跟.NET其实没有关系。TCCTry准备阶段,尝试执行业务Confirm完成业务Cancel回滚准备阶段的业务TCC事务其实是2PC的一个扩展。上一次... 查看详情

.netcorewith微服务-可靠消息最终一致性分布式事务

前面我们讲了分布式事务的2PC、3PCTCC的原理。这些事务其实都在尽力的模拟数据库的事务,我们可以简单的认为他们是一个同步行的事务。特别是2PC,3PC他们完全利用数据库的事务能力,在一阶段开始事务后不进提交会严... 查看详情

使用 JWT 保护微服务之间的通信

】使用JWT保护微服务之间的通信【英文标题】:SecurecommunicationbetweenmicroservicesusingJWT【发布时间】:2019-11-2300:08:28【问题描述】:我使用SpringBoot构建了3个微服务:1)Auth服务-创建JWT。2和3-做某事的微服务(RESTAPI)。理论上,用户可... 查看详情

微服务如何使用另一个微服务的安全配置

】微服务如何使用另一个微服务的安全配置【英文标题】:Howcanamicroserviceusethesecurityconfigofanothermicroservice【发布时间】:2019-11-2710:32:44【问题描述】:我有一个用户微服务处理与用户相关的所有事情,包括安全性(创建用户、登... 查看详情

chrisrichardson微服务系列使用微服务重构单体应用-7

编者的话|本文来自Nginx官方博客,是「ChrisRichardson微服务」系列的最后一篇。第一篇介绍了微服务架构模块,并且讨论了使用微服务的优缺点。随后的文章讨论了微服务的不同方面,包括使用API网关、进程间通讯、服务发现、事... 查看详情

如何使用 Apache Kafka 使用 SpringBoot 将数据从一个微服务发送到另一个微服务?

】如何使用ApacheKafka使用SpringBoot将数据从一个微服务发送到另一个微服务?【英文标题】:HowtouseApacheKafkatosenddatafromonemicroservicetoothermicroserviceusingSpringBoot?【发布时间】:2020-01-1003:31:20【问题描述】:如何使用ApacheKafka通过SpringBo... 查看详情

使用 jwt 在微服务中进行身份验证

】使用jwt在微服务中进行身份验证【英文标题】:Authenticationinmicroservicesusingjwt【发布时间】:2020-06-1210:11:04【问题描述】:我将使用Laravel框架构建微服务。我有用户微服务来处理客户端凭据并对其进行身份验证(为客户端创建J... 查看详情

如何使用 docker 镜像从另一个微服务调用一个微服务

】如何使用docker镜像从另一个微服务调用一个微服务【英文标题】:Howtocallonemicroservicefromanothermicroserviceusingdockerimages【发布时间】:2018-12-2705:33:43【问题描述】:我有两个SpringBoot微服务M1(port2002)和M2(port2004)如果我使用eclipse运行... 查看详情

为啥使用 gRPC 进行微服务间通信?

】为啥使用gRPC进行微服务间通信?【英文标题】:WhyusegRPCforintermicroservicecommunication?为什么使用gRPC进行微服务间通信?【发布时间】:2021-09-2208:59:33【问题描述】:我正在学习gRPC使用node.js实现的东西。我读到gRPC和微服务间的... 查看详情

微服务 - 如何使用 JWT 对单独的 API 微服务进行身份验证

】微服务-如何使用JWT对单独的API微服务进行身份验证【英文标题】:Microservices-HowtoauthenticateseparateAPIMicroservicewithJWT【发布时间】:2020-05-0216:00:52【问题描述】:我正在尝试在springboot(java)中构建一个微服务。我正在使用Java中的... 查看详情

使用 JWT 的微服务认证和授权

】使用JWT的微服务认证和授权【英文标题】:MicroserviceAuthenticationandAuthorizationusingJWT【发布时间】:2020-11-2905:50:27【问题描述】:我创建了两个微服务让A和B。每个微服务都有自己的数据库和用于存储用户名和密码的用户表。我... 查看详情

是否可以使用 ORM 构建微服务?

】是否可以使用ORM构建微服务?【英文标题】:IsitpossibletobuildmicroservicewithORM?【发布时间】:2021-12-0417:39:45【问题描述】:我想开始练习使用SpringBoot和MySQL创建微服务。但对我来说有一个悬而未决的问题。如果可能的话,如果实... 查看详情

使用 grafana 监控微服务

】使用grafana监控微服务【英文标题】:Monitoringmicroserviceusinggrafana【发布时间】:2019-03-2513:06:08【问题描述】:我们有多个微服务,它们具有JSON形式的健康端点。JSON可能包含微服务将调用的其他服务的状态。有没有办法可以在Gr... 查看详情