go-zero微服务框架rpc的使用(代码片段)

特立独行的猫a 特立独行的猫a     2022-12-18     182

关键词:

接上一篇文章,按照”终端出厂实现自动化运维方案"思路,https://blog.csdn.net/yyz_1987/article/details/118358038

以终端状态上送监控服务为例,记录下go-zero微服务的简单使用,实现一个简易的后台监控云服务,监控所有出厂状态上报的终端状态。

新建一个Golang服务后台项目代码的目录,取名monitor。

环境准备

电脑或服务器安装有mysql,redis,etcd

下载好一些插件工具有:goctl,protoc.exe,proto-gen-go.exe

网关层实现

按照goctl这一代码生成神器的使用方式,首先定义一下终端需要上送的接口字段信息:

statusUpload.api

type (
	//终端状态上报内容
	StatusUploadReq 
		Sn     string `json:"sn"`     //设备唯一号
		Pos    string `json:"pos"`    //终端编号
		City   string `json:"city"`   //城市代码
		Id     string `json:"id"`     //终端类型
		Unum1  uint   `json:"unnum"`  //未传记录数量--公交
		Unum2  uint   `json:"unnum"`  //未传记录数量--三方
		Ndate  string `json:"ndate"`  //当前日期
		Ntime  string `json:"ntime"`  //当前时间
		Amount uint   `json:"amount"` //当班总额
		Count  uint   `json:"count"`  //当班人数
		Line   uint   `json:"line"`   //线路号

		Stime string `json:"stime"` //开机时间
		Ctime string `json:"ctime"` //关机时间

		Tenant uint `json:"tenant"` //租户ID
	

	//应答内容
	StatusUploadResp 
		Code int    `json:"code"`
		Msg  string `json:"msg"`
		Cmd  int    `json:"cmd"` //控制终端命令字
	
)

service open-api 
	@doc(
		summary: 公开的api函数
		desc: >statusUpload 终端状态上报
	)
	@server(
		handler: statusUploadHandler
		folder: open
	)
	post /open/statusUpload(StatusUploadReq) returns(StatusUploadResp)
	

接下来借助goctl神器的威力,直接生成网关层代码啦:

goctl api    go    -api       statusUpload.api   -dir    .

生成代码目录结构如下:

接下来跑起来试试:

go run open.go

网关层启动成功,侦听端口8888,在etc文件夹内的 open-api.yaml文件中有配置

分别用postman和curl工具测下:

curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json

 status.json文件内容:


    "sn": "1C2EB08D",
    "pos": "12345678",
    "city": "0371",
    "id": "B503",
    "unum1": 0,
    "unum2": 0,
    "ndate": "2021-08-07",
    "ntime": "18:30:30",
    "amount": 0,
    "count": 0,
    "line": 101,
    "stime": "05:01:01",
    "ctime": "18:30:20",
    "tenant": 0

RPC服务端实现

接下来,把它改造成微服务的形式,通过rpc调用服务提供的接口。大体结构如下:

需要提前安装就绪etcd环境,且需要安装一些插件工具,如proto.exe 和proto-gen-go.exe工具,放到go或gopath的bin目录下。

在项目代码跟目录下创建rpc文件夹,建个微服端的代码目录,这里取名为status。

定义proto文件,status.proto如下:

syntax = "proto3";

package status;

message statusUploadReq 
    string sn = 1;
    string pos = 2;
	string city  = 3;
	string id = 4;  
	uint32 unum1 = 5;
	uint32 unum2 = 6;
	string ndate = 7;
	string ntime = 8;
	uint32 amount = 9;
	uint32 count = 10;
	uint32 line  = 11;
	      
	string stime = 12;
	string ctime = 13;
	      
	uint32 tenant = 14;



message statusUploadResp 
    int32 code = 1;
    string msg = 2;
    int32 cmd = 3;


service statusUploader 
    rpc statusUpload(statusUploadReq) returns(statusUploadResp);

然后又是goctl神器发威了,自动生成代码,厉害不?

goctl rpc proto -src=status.proto  -dir    .

自动生成的文件目录如下:

自动生成的并不包含上图的那个client文件夹。client文件夹是为了单独测试rpc服务自己创建的,做个client端的demo调用一下rpc服务。model文件夹也是手工创建的,里面放数据库的操作接口。

 自动生成的rpc服务端status.go入口文件内容:

package main

import (
	"flag"
	"fmt"

	"monitor/rpc/status/internal/config"
	"monitor/rpc/status/internal/server"
	"monitor/rpc/status/internal/svc"
	"monitor/rpc/status/status"

	"github.com/tal-tech/go-zero/core/conf"
	"github.com/tal-tech/go-zero/zrpc"
	"google.golang.org/grpc"
)

var configFile = flag.String("f", "etc/status.yaml", "the config file")

func main() 
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)
	ctx := svc.NewServiceContext(c)
	srv := server.NewStatusUploaderServer(ctx)

	s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) 
		status.RegisterStatusUploaderServer(grpcServer, srv)
	)
	defer s.Stop()

	fmt.Printf("Starting rpc server at %s...\\n", c.ListenOn)
	s.Start()

这时候如果启动了etcd,那么直接go run status.go,服务端就启动成功啦。

RPC客户端测试

为了验证下rpc服务端是工作正常的,在client文件夹中实现个zrpc的客户端测试一下:

client.go文件如下:

package main

import (
	"context"
	"fmt"
	"github.com/tal-tech/go-zero/core/discov"
	"github.com/tal-tech/go-zero/zrpc"
	"log"
	pb "monitor/rpc/status/status"
)

func main() 
	client := zrpc.MustNewClient(zrpc.RpcClientConf
		Etcd: discov.EtcdConf
			Hosts: []string"127.0.0.1:2379",
			Key:   "status.rpc",
		,
	)
	sclient := pb.NewStatusUploaderClient(client.Conn())
	reply, err := sclient.StatusUpload(context.Background(), &pb.StatusUploadReqSn: "test rpc", Pos: "go-zero")
	if err != nil 
		log.Fatal(err)
	
	fmt.Println(reply.Msg)

如果服务正常,会收到服务端接口的响应。

网关层调用改为微服务方式调用

可以把网关层改造下,改为微服务的调用方式。改动点并不大,如下:

第一步:

api目录api\\internal\\config路径下的config文件和api\\etc下的open-api.yaml文件改动:

open-api.yaml增加etcd的相关配置,用于连接到etcd服务中心,查找对应的服务方法。

注意,Config结构里的Status名字和那个配置文件中的是一一对应的,不能错。如果有多个微服务,这里 可以依次写上,如这种:

Status:
  Etcd:
    Hosts:
      - localhost:2379
    Key: status.rpc
Expander:
  Etcd:
    Hosts:
      - localhost:2379
    Key: expand.rpc
type Config struct 
  rest.RestConf
  Status    zrpc.RpcClientConf     // 手动代码
  Expander  zrpc.RpcClientConf     // 手动代码

 第二步:

api目录api\\internal\\svc路径下servicecontext.go文件改动:

 第三步:

api\\internal\\logic目录下statusuploadlogic.go文件改动,

 至此api网关层改造完成。可以模拟访问网关接口地址试试啦

curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json

定义数据库表结构,并生成 CRUD+cache 代码

  • monitor项目根路径下创建 rpc/model 目录:mkdir -p rpc/model
  • 在 rpc/model 目录下编写创建 tb_status表的 sql 文件status.sql,如下:
CREATE TABLE `tb_status`
(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `sn` VARCHAR(32) NOT NULL COMMENT '设备唯一号',
   `posno` VARCHAR(32)  COMMENT '终端编号',
   `city` VARCHAR(16)   COMMENT '城市代码',
   `tyid` VARCHAR(16)   COMMENT '设备类型',
   `unum1`  INT   COMMENT '未传记录数--公交',
   `unum2`  INT   COMMENT '未传记录数--三方',
   `ndate`  DATE  COMMENT '当前日期',
   `ntime`  TIME  COMMENT '当前时间',
   `amount` INT   COMMENT '当班总额',
   `count`  INT   COMMENT '当班人数',
   `line`   INT   COMMENT '线路编号',
   `stime`  TIME  COMMENT '开机时间 ',
   `ctime`  TIME  COMMENT '关机时间 ',
   `tenant` INT   COMMENT '租户号 ',

  PRIMARY KEY(`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4;
  • 创建 DB,取名叫monitor, 和 table
create database monitor;

source status.sql;
  • rpc/model目录下执行如下命令生成 CRUD+cache 代码,-c表示使用redis cache
goctl model mysql ddl -c -src status.sql -dir .

也可以用datasource命令代替ddl来指定数据库链接直接从 schema 生成

生成后的文件结构如下:

rpc/model
├── status.sql
├── tbstatusmodel.go              // CRUD+cache代码
└── vars.go                       // 定义常量和变量

自动生成的tbstatusmodel.go 文件内容: 

package model

import (
	"database/sql"
	"fmt"
	"strings"

	"github.com/tal-tech/go-zero/core/stores/cache"
	"github.com/tal-tech/go-zero/core/stores/sqlc"
	"github.com/tal-tech/go-zero/core/stores/sqlx"
	"github.com/tal-tech/go-zero/core/stringx"
	"github.com/tal-tech/go-zero/tools/goctl/model/sql/builderx"
)

var (
	tbStatusFieldNames          = builderx.RawFieldNames(&TbStatus)
	tbStatusRows                = strings.Join(tbStatusFieldNames, ",")
	tbStatusRowsExpectAutoSet   = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), ",")
	tbStatusRowsWithPlaceHolder = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), "=?,") + "=?"

	cacheTbStatusIdPrefix = "cache::tbStatus:id:"
)

type (
	TbStatusModel interface 
		Insert(data TbStatus) (sql.Result, error)
		FindOne(id int64) (*TbStatus, error)
		Update(data TbStatus) error
		Delete(id int64) error
	

	defaultTbStatusModel struct 
		sqlc.CachedConn
		table string
	

	TbStatus struct 
		Id     int64          `db:"id"`
		Sn     sql.NullString `db:"sn"`     // 设备唯一号
		Posno  sql.NullString `db:"posno"`  // 终端编号
		City   sql.NullString `db:"city"`   // 城市代码
		Tyid   sql.NullString `db:"tyid"`   // 设备类型
		Unum1  sql.NullInt64  `db:"unum1"`  // 未传记录数--公交
		Unum2  sql.NullInt64  `db:"unum2"`  // 未传记录数--三方
		Ndate  sql.NullTime   `db:"ndate"`  // 当前日期
		Ntime  sql.NullString `db:"ntime"`  // 当前时间
		Amount sql.NullInt64  `db:"amount"` // 当班总额
		Count  sql.NullInt64  `db:"count"`  // 当班人数
		Line   sql.NullInt64  `db:"line"`   // 线路编号
		Stime  sql.NullString `db:"stime"`  // 开机时间
		Ctime  sql.NullString `db:"ctime"`  // 关机时间
		Tenant sql.NullInt64  `db:"tenant"` // 租户号
	
)

func NewTbStatusModel(conn sqlx.SqlConn, c cache.CacheConf) TbStatusModel 
	return &defaultTbStatusModel
		CachedConn: sqlc.NewConn(conn, c),
		table:      "`tb_status`",
	


func (m *defaultTbStatusModel) Insert(data TbStatus) (sql.Result, error) 
	query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tbStatusRowsExpectAutoSet)
	ret, err := m.ExecNoCache(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant)

	return ret, err


func (m *defaultTbStatusModel) FindOne(id int64) (*TbStatus, error) 
	tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id)
	var resp TbStatus
	err := m.QueryRow(&resp, tbStatusIdKey, func(conn sqlx.SqlConn, v interface) error 
		query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table)
		return conn.QueryRow(v, query, id)
	)
	switch err 
	case nil:
		return &resp, nil
	case sqlc.ErrNotFound:
		return nil, ErrNotFound
	default:
		return nil, err
	


func (m *defaultTbStatusModel) Update(data TbStatus) error 
	tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, data.Id)
	_, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) 
		query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, tbStatusRowsWithPlaceHolder)
		return conn.Exec(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant, data.Id)
	, tbStatusIdKey)
	return err


func (m *defaultTbStatusModel) Delete(id int64) error 

	tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id)
	_, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) 
		query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
		return conn.Exec(query, id)
	, tbStatusIdKey)
	return err


func (m *defaultTbStatusModel) formatPrimary(primary interface) string 
	return fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, primary)


func (m *defaultTbStatusModel) queryPrimary(conn sqlx.SqlConn, v, primary interface) error 
	query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table)
	return conn.QueryRow(v, query, primary)

修改 monitor/rpc/status rpc 代码调用 crud+cache 代码 

  • 修改rpc/status/etc/status.yaml,增加如下内容:

  • 修改rpc/status/internal/config.go,如下:
package config

import "github.com/tal-tech/go-zero/zrpc"
//手动代码
import "github.com/tal-tech/go-zero/core/stores/cache"

type Config struct 
	zrpc.RpcServerConf
	DataSource string          // 手动代码
	Cache      cache.CacheConf // 手动代码


增加了 mysql 和 redis cache 配置 

  • 修改rpc/status/internal/svc/servicecontext.go,如下:
package svc

import "monitor/rpc/status/internal/config"

//手动代码
import "monitor/rpc/status/model"

type ServiceContext struct 
	Config config.Config
	Model  model.TbStatusModel // 手动代码


func NewServiceContext(c config.Config) *ServiceContext 
	return &ServiceContext
		Config: c,
		Model:  model.NewTbStatusModel(sqlx.NewMysql(c.DataSource), c.Cache), // 手动代码
	

  • 修改rpc/status/internal/logic/statusuploadlogic.go,如下:
package logic

import (
	"context"

	"monitor/rpc/status/internal/svc"
	"monitor/rpc/status/status"

	"github.com/tal-tech/go-zero/core/logx"
	//手动代码
	"database/sql"
	"monitor/rpc/status/model"
	"time"
)

type StatusUploadLogic struct 
	ctx    context.Context
	svcCtx *svc.ServiceContext
	logx.Logger
	model model.TbStatusModel // 手动代码


func NewStatusUploadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StatusUploadLogic 
	return &StatusUploadLogic
		ctx:    ctx,
		svcCtx: svcCtx,
		Logger: logx.WithContext(ctx),
		model:  svcCtx.Model, // 手动代码
	


func (l *StatusUploadLogic) StatusUpload(in *status.StatusUploadReq) (*status.StatusUploadResp, error) 
	// todo: add your logic here and delete this line
	// 手动代码开始,插入记录到数据库
	t, _ := time.Parse("2006-01-02", in.Ndate)
	_, err := l.model.Insert(model.TbStatus
		Sn:    sql.NullStringin.Sn, true,
		Posno: sql.NullStringin.Pos, true,
		City:  sql.NullStringin.City, true,
		Tyid:  sql.NullStringin.Id, true,
		Ndate: sql.NullTimet, true,
		Ntime: sql.NullStringin.Ntime, true,
	)
	if err != nil 
		return nil, err
	

	return &status.StatusUploadRespCode: 0, Msg: "server resp,insert data ok", Cmd: 1, nil

注意这里的sql.NullString和NullTime的写法,如果后面的第二个参数值为false,则插入到库中的值为空。

最后测试下,发现数据已经可以成功入库。

使用go-zero微服务框架实现云监控后台(三.终端状态更新json文件实现)(代码片段)

这是我计划的终端状态监控服务的终端部分的模块组件。终端应用程序定时更新状态文件,应用中跑的另一个后台服务则定时读取该状态文件并上送至后台服务。以此无耦合的实现对终端的状态监控。参见:https://blog.csdn... 查看详情

go-zero学习及使用中遇到的问题(代码片段)

go-zero学习及使用中遇到的问题1go-zero入门--单体服务demo1.1单体服务【官方示例】1.1.1创建greet服务1.1.2目录结构1.1.3编写逻辑1.1.4启动并访问服务1.2修改GET入参1.2.1去除options限制的入参值1.2.2重启并访问服务1.3添加post请求【新增方... 查看详情

go-zero成长之路—微服务电商实战系列(六条件查询)(代码片段)

...楚的,可通过如下传送门查看该系列其他文章:go-zero成长之路—微服务电商实战系列(五、RPC定义)go-zero成长之路—微服务电商实战系列(四、API定义)go-zero成长之路—微服务电商实战系列(三、表... 查看详情

微服务从代码到k8s部署应有尽有系列全集(代码片段)

...志到监控等各个方面的微服务完整实践。整个项目使用了go-zero开发的微服务,基本包含了go-zero以及相关go-zero作者开发的一些中间件,所用到的技术栈基本是go-zero项目组的自研组件,基本是go-zero全家桶了。实战项目地址:https:/... 查看详情

腾讯推出高性能rpc开发框架(代码片段)

... ,拉你进程序员交流群👇👇Tars是基于名字服务使用Tars协议的高性能RPC开发框架,同时配套一体化的服务治理平台,帮助个人或者企业快速的以微服务的方式构建自己稳定可靠的分布式应用。Tars是将腾讯内... 查看详情

带你十天轻松搞定go微服务系列(代码片段)

本文开始,我们会出一个系列文章跟大家详细展示一个go-zero微服务示例,整个系列分十篇文章,目录结构如下:环境搭建(本文)服务拆分用户服务产品服务订单服务支付服务RPC服务Auth验证服务监控链路追踪分布式事务期望通... 查看详情

带你十天轻松搞定go微服务系列(代码片段)

本文开始,我们会出一个系列文章跟大家详细展示一个go-zero微服务示例,整个系列分十篇文章,目录结构如下:环境搭建(本文)服务拆分用户服务产品服务订单服务支付服务RPC服务Auth验证服务监控链路追踪分布式事务期望通... 查看详情

go微服务grpc(代码片段)

gRPC教程RPC算是近些年比较火热的概念了,随着微服务架构的兴起,RPC的应用越来越广泛。本文介绍了RPC和gRPC的相关概念,并且通过详细的代码示例介绍了gRPC的基本使用。gRPC是什么gRPC是一种现代化开源的高性能RPC框... 查看详情

go微服务grpc(代码片段)

gRPC教程RPC算是近些年比较火热的概念了,随着微服务架构的兴起,RPC的应用越来越广泛。本文介绍了RPC和gRPC的相关概念,并且通过详细的代码示例介绍了gRPC的基本使用。gRPC是什么gRPC是一种现代化开源的高性能RPC框... 查看详情

微服务从代码到k8s部署应有尽有大结局(k8s部署)(代码片段)

...志到监控等各个方面的微服务完整实践。整个项目使用了go-zero开发的微服务,基本包含了go-zero以及相关go-zero作者开发的一些中间件,所用到的技术栈基本是go-zero项目组的自研组件,基本是go-zero全家桶了。实战项目地址:https:/... 查看详情

go-zero微服务开发环境搭建(代码片段)

原创不易,未经允许,请勿转载。更新于2022-03-04,增加插件安装文章目录一、Docker以及mysql、redis等软件的安装二、安装protoc-gen-go三、安装protoc四、安装goctl工具五、GoLand插件安装一、Docker以及mysql、redis等软件的安装... 查看详情

go-zero微服务开发环境搭建(代码片段)

原创不易,未经允许,请勿转载。更新于2022-03-04,增加插件安装文章目录一、Docker以及mysql、redis等软件的安装二、安装protoc-gen-go三、安装protoc四、安装goctl工具五、GoLand插件安装一、Docker以及mysql、redis等软件的安装... 查看详情

计网-怎样实现rpc框架(代码片段)

...名注册和发现负载均衡的设计可用性和容灾小结Pre随着微服务架构的盛行,远程调用成了开发微服务必不可少的能力,RPC框架作为微服务体系的底层支撑,也成了日常开发的必备工具。当下,RPC框架已经不仅是进... 查看详情

go微服务——rpc(代码片段)

...实现RPC1.1.4.RPC调用流程1.1.5.网络传输数据格式1.1.6.实现RPC服务端1.1.7.实现RPC客户端1.1.8.实现RPC通信测试1.RPC1.1.1.RPC简介远程过程调用(RemoteProcedureCall,RPC)是一个计算机通信 查看详情

分布式与微服务系列分布式rpc框架apachedubbo服务(代码片段)

分布式RPC框架ApacheDubbo服务一、ApacheDubbo概述1.1、Dubbo简介1.2、什么是RPC?1.3、什么是服务发现?1.3、Dubbo架构二、服务注册中心Zookeeper2.1、Zookeeper介绍2.2、安装Zookeeper2.3、启动、停止Zookeeper三、Dubbo快速入门3.1、服务提供... 查看详情

sprngcloud微服务框架搭建(代码片段)

...e/details/701488331、简介目前来说,SpringCloud是比较完整的微服务解决方案框架。不像其他rpc远程调用框架,只是解决某个微服务中的问题。2、微服务框架搭建2.1、服务的注册与发现Eureka(Finchley版本)本次采用Eureka作为服务注册与... 查看详情

go-zero:尝试使用go-zero的工具goctl进行model,controller代码生成,配置数据库,实现findall方法,查询数据库全部数据(代码片段)

目录前言1,关于go-zero框架2,使用goctl生成代码,安装工具3,使用goctl生成数据库model的crud代码4,使用goctl生成controller代码6,增加数据库,日志配置7,总结前言本文的原文连接是:https://blog.csdn.net/f... 查看详情

自适应微服务治理背后的算法(代码片段)

前言go-zero群里经常有同学问:服务监控是通过什么算法实现的?滑动窗口是怎么工作的?能否讲讲这块的原理?熔断算法是怎么设计的?为啥没有半开半闭状态呢?本篇文章,来分析一下go-zero中指标统计背后的实现算法和逻辑... 查看详情