手撸golanggo与微服务saga模式之1

ioly      2022-02-12     592

关键词:

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补偿动作都是幂等的, 允许重复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,
每个Ti都有对应的补偿“Ci”,
当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。
T = T1 T2 … Tn
T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
而应该将事务切分为一组按序依次提交的短事务,
Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <<Go微服务实战>> 刘金亮, 2021.1

目标

  • 为实现saga模式的分布式事务, 先模拟实现一个简单的pub/sub消息服务
  • 使用sqlx + sqlite3持久化消息, 使用gin作为http框架, 使用toml配置文件

代码结构

$ tree saga
saga
└── mqs
    ├── cmd
    │   └── boot.go
    ├── config
    │   └── config.go
    ├── database
    │   └── database.go
    ├── handlers
    │   ├── ping.go
    │   └── subscribe.go
    ├── logger
    │   └── logger.go
    └── routers
        └── routers.go

设计

  • config: 读取并解析toml配置文件
  • database: 封装sqlx + sqlite3
  • handlers/ping: 预留http服务保活探针接口
  • handlers/subscribe: 注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)
  • routers: 注册gin路由

config.go

读取并解析toml配置文件

package config

import  "github.com/BurntSushi/toml"

type tomlConfig struct {
    MQS tServiceConfig
}

type tServiceConfig struct {
    Port int
    LogDir string
}

var gServiceConfig = &tServiceConfig{}

func init() {
    cfg := &tomlConfig{}
    if _,err := toml.DecodeFile("./mqs.toml", cfg);err != nil {
        panic(err)
    }
    *gServiceConfig = cfg.MQS
}

func GetPort() int {
    return gServiceConfig.Port
}

func GetLogDir() string {
    return gServiceConfig.LogDir
}

database.go

封装sqlx + sqlite3

package database

import "github.com/jmoiron/sqlx"
import _ "github.com/mattn/go-sqlite3"

type DBFunc func(db *sqlx.DB) error
type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error

func init() {
    // must prepare tables
    err := DB(func(db *sqlx.DB) error {
        // table: sub_info
        _,e := db.Exec(`
        create table if not exists sub_info(
            id integer primary key autoincrement,
            client_id varchar(50) unique not null,
            topic varchar(100) not null,
            callback_url varchar(500) not null
        )`)
        return e
    })
    if err != nil {
        panic(err)
    }
}

func open() (*sqlx.DB, error) {
    return sqlx.Open("sqlite3", "./mqs.db")
}

func DB(action DBFunc) error {
    db,err := open()
    if err != nil {
        return err
    }
    defer func() { _ = db.Close() }()

    return action(db)
}

func TX(action TXFunc) error {
    return DB(func(db *sqlx.DB) error {
        tx, err := db.Beginx()
        if err != nil {
            return err
        }

        err = action(db, tx)
        if err == nil {
            return tx.Commit()
        } else {
            return tx.Rollback()
        }
    })
}

ping.go

预留http服务保活探针接口

package handlers

import (
    "github.com/gin-gonic/gin"
    "net/http"
    "time"
)

func Ping(c *gin.Context) {
    c.JSON(http.StatusOK, gin.H{ "ok": true, "time": time.Now().Format(time.RFC3339)})
}

subscribe.go

注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)

package handlers

import (
    "github.com/gin-gonic/gin"
    "github.com/gin-gonic/gin/binding"
    "github.com/jmoiron/sqlx"
    "learning/gooop/saga/mqs/database"
    "net/http"
)

type tSubMsg struct {
    ClientID string
    Topic string
    CallbackUrl string
}

func Subscribe(c *gin.Context) {
    msg := &tSubMsg{}
    if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
        c.AbortWithStatusJSON(
            http.StatusInternalServerError,
            gin.H{"error": err.Error()})
        return
    }
    
    err := database.DB(func(db *sqlx.DB) error {
        _,e := db.Exec(
            "replace into sub_info(client_id, topic, callback_url) values(?, ?, ?)",
            msg.ClientID,
            msg.Topic,
            msg.CallbackUrl)
        return e
    })

    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
    } else {
        c.JSON(http.StatusOK, gin.H { "ok": true })
    }
}

routers.go

注册gin路由

package routers

import (
    "github.com/gin-gonic/gin"
    "learning/gooop/saga/mqs/handlers"
)

func RegisterRouters() *gin.Engine {
    r := gin.Default()
    r.Use(gin.Logger())
    r.GET("/ping", handlers.Ping)
    r.POST("/subscribe", handlers.Subscribe)
    return r
}

(未完待续)

手撸golanggo与微服务chatserver之1

缘起最近阅读<<Go微服务实战>>(刘金亮,2021.1)本系列笔记拟采用golang练习之案例需求(聊天服务器)用户可以连接到服务器。用户可以设定自己的用户名。用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息... 查看详情

手撸golanggo与微服务chatserver之3压测与诊断

缘起最近阅读<<Go微服务实战>>(刘金亮,2021.1)本系列笔记拟采用golang练习之案例需求(聊天服务器)用户可以连接到服务器。用户可以设定自己的用户名。用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息... 查看详情

saga模式seatasaga模式详解(代码片段)

文章目录一、前言二、SAGA模式0、saga论文摘要1、什么是长事务?2、saga的组成3、saga的两种执行场景1)forwardrecovery2)backwardrecovery4、sagalog5、saga协调(saga实现方式)1)SAGA-Choreography策略2)SAGA-Orchestratio... 查看详情

手撸golangetcdraft协议之11

手撸golangetcdraft协议之11缘起最近阅读[云原生分布式存储基石:etcd深入解析](杜军,2019.1)本系列笔记拟采用golang练习之raft分布式一致性算法分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了... 查看详情

seata基础使用-分布式事务

...式事务三、Seata基础1、认识Seata2、部署TC(Server端)3、微服务集成Seata四、Seata事务管理-XA模式1、XA模式2、XA模式特点3、实现XA模式五、Seata事务管理-AT模式1、AT模式2、AT模式预防脏写3、AT模式特点4、AT模式实现六、Seata事务管理-... 查看详情

hm-springcloud微服务系列9.3.2实践:tcc模式saga模式

3.TCC模式3.1原理3.2实现4.SAGA模式4.1原理4.2实现 查看详情

手撸golang仿springioc/aop之4蓝图

手撸golang仿springioc/aop之4蓝图缘起最近阅读[SpringBoot技术内幕:架构设计与实现原理](朱智胜,2020.6)本系列笔记拟采用golang练习之Talkischeap,showmethecode.SpringSpring的主要特性:1.控制反转(InversionofControl,IoC)2.面向容器3.面向切面(Aspect... 查看详情

手撸golang仿springioc/aop之12增强3

手撸golang仿springioc/aop之12增强3缘起最近阅读[SpringBoot技术内幕:架构设计与实现原理](朱智胜,2020.6)本系列笔记拟采用golang练习之Talkischeap,showmethecode.SpringSpring的主要特性:1.控制反转(InversionofControl,IoC)2.面向容器3.面向切面(Aspe... 查看详情

手撸golang仿springioc/aop之5如何扫描

手撸golang仿springioc/aop之5如何扫描缘起最近阅读[SpringBoot技术内幕:架构设计与实现原理](朱智胜,2020.6)本系列笔记拟采用golang练习之Talkischeap,showmethecode.SpringSpring的主要特性:1.控制反转(InversionofControl,IoC)2.面向容器3.面向切面(... 查看详情

你这saga事务保“隔离性”吗?(代码片段)

...与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。注意最后一句话很关键,说明Saga模式的回滚其实和AT、TCC的回滚一样,都是反向补偿操作 查看详情

Kafka 主题过滤与微服务请求/回复模式的临时主题

】Kafka主题过滤与微服务请求/回复模式的临时主题【英文标题】:Kafkatopicfilteringvs.ephemeraltopicsformicroservicerequest/replypattern【发布时间】:2019-07-2016:35:46【问题描述】:我正在尝试使用Kafka实现请求/回复模式。我正在使用向这些服... 查看详情

手撸golang学etcd手写raft协议之12单元测试

手撸golang学etcd手写raft协议之12单元测试缘起最近阅读[云原生分布式存储基石:etcd深入解析](杜军,2019.1)本系列笔记拟采用golang练习之raft分布式一致性算法分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用... 查看详情

事件溯源是基于编排的 SAGA 模式的增强模式吗?

...时间】:2021-06-2906:50:57【问题描述】:这几天我在研究微服务间的通信模式。所以在我的研究中,我发现有两种模式叫做SAGA和事件溯源。但是我在互联网上找不到资源来了解这两种模式之间的区别。我的意思是我知道事件溯源... 查看详情

微服务设计模式(系列)-分布式事务(saga模式)

这里写自定义目录标题SagasSagas的实现范式Sagas每个Sagas由一系列sub-transactionTi(saga)组成每个Ti都有对应的补偿动作Ci,补偿动作用于撤销Ti造成的结果可以看到,和TCC相比,Saga没有“预提交”阶段,它的Ti... 查看详情

手撸golangspringioc/aop之2

手撸golangspringioc/aop之2缘起最近阅读[Offer来了:Java面试核心知识点精讲(框架篇)](王磊,2020.6)本系列笔记拟采用golang练习之Talkischeap,showmethecode.SpringSpring基于J2EE技术实现了一套轻量级的JavaWebService系统应用框架。它有很多优秀的... 查看详情

手撸golang行为型设计模式模板方法模式

手撸golang行为型设计模式模板方法模式缘起最近复习设计模式拜读谭勇德的<<设计模式就该这样学>>本系列笔记拟采用golang练习之模板方法模式模板方法模式(TemplateMethodPattern)又叫作模板模式,指定义一个操作中的算... 查看详情

手撸golangspringioc/aop之2

参考技术A手撸golangspringioc/aop之2最近阅读[Offer来了:Java面试核心知识点精讲(框架篇)](王磊,2020.6)本系列笔记拟采用golang练习之Talkischeap,showmethecode.配置接口指令接口指令构建器接口指令执行上下文接口保存配置另存配置添加... 查看详情

手撸golang行为型设计模式访问者模式

手撸golang行为型设计模式访问者模式缘起最近复习设计模式拜读谭勇德的<<设计模式就该这样学>>本系列笔记拟采用golang练习之访问者模式访问者模式(VisitorPattern)是一种将数据结构与数据操作分离的设计模式,指封装... 查看详情