twemproxy发送流程探索——剖析twemproxy代码正编

我没货,只剩下水了 我没货,只剩下水了     2022-09-04     216

关键词:

本文想要完成对twemproxy发送流程——msg_send的探索,对于twemproxy发送流程的数据结构已经在《twemproxy接收流程探索——剖析twemproxy代码正编》介绍过了,msg_send和msg_recv的流程大致类似。请在阅读代码时,查看注释,英文注释是作者对它的代码的注解,中文注释是我自己的感悟。

函数msg_send

 1 rstatus_t
 2 msg_send(struct context *ctx, struct conn *conn)
 3 {
 4     rstatus_t status;
 5     struct msg *msg;
 6     /*表示活跃的发送状态*/
 7     ASSERT(conn->send_active);
 8     /*表示准备发送*/
 9     conn->send_ready = 1;
10     do {
11         /*获取下一次发送的msg开头*/
12         msg = conn->send_next(ctx, conn);
13         if (msg == NULL) {
14             /* nothing to send */
15             return NC_OK;
16         }
17         /*发送框架,在此框架内conn->send_ready会改变*/
18         status = msg_send_chain(ctx, conn, msg);
19         if (status != NC_OK) {
20             return status;
21         }
22 
23     } while (conn->send_ready);
24 
25     return NC_OK;
26 }

 

发送框架msg_send_chain

由于在发送时,其底层采用writev的高效发送方式,难免出现数据发送到一边,系统的发送队列已满的情况,面对这种尴尬的情况,你应该如何处理?twemproxy的作者给出了自己的方式。

  1 static rstatus_t
  2 msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
  3 {
  4     struct msg_tqh send_msgq;            /* send msg q */
  5     struct msg *nmsg;                    /* next msg */
  6     struct mbuf *mbuf, *nbuf;            /* current and next mbuf */
  7     size_t mlen;                         /* current mbuf data length */
  8     struct iovec *ciov, iov[NC_IOV_MAX]; /* current iovec */
  9     struct array sendv;                  /* send iovec */
 10     size_t nsend, nsent;                 /* bytes to send; bytes sent */
 11     size_t limit;                        /* bytes to send limit */
 12     ssize_t n;                           /* bytes sent by sendv */
 13 
 14     TAILQ_INIT(&send_msgq);
 15 
 16     array_set(&sendv, iov, sizeof(iov[0]), NC_IOV_MAX);
 17 
 18     /* preprocess - build iovec */
 19 
 20     nsend = 0;
 21     /*
 22      * readv() and writev() returns EINVAL if the sum of the iov_len values
 23      * overflows an ssize_t value Or, the vector count iovcnt is less than
 24      * zero or greater than the permitted maximum.
 25      */
 26     limit = SSIZE_MAX;
 27 
 28     /*
 29      *send_msgq是一个临时的发送队列,将当前能进行发送的msg,即处理完的msg
 30      *进行存储。发送队列仅仅自后面处理时能让调用者以msg的buf为单位处理。
 31      *sendv是一个字符串数组,由于发送底层采用的函数是writev,为此sendv将发
 32      *送的数据都存储在一起,sendv才是真正发送的数据内存。
 33      */
 34     for (;;) {
 35         ASSERT(conn->smsg == msg);
 36 
 37         TAILQ_INSERT_TAIL(&send_msgq, msg, m_tqe);
 38 
 39         for (mbuf = STAILQ_FIRST(&msg->mhdr);
 40              mbuf != NULL && array_n(&sendv) < NC_IOV_MAX && nsend < limit;
 41              mbuf = nbuf) {
 42             nbuf = STAILQ_NEXT(mbuf, next);
 43             /*
 44              *发送的信息是否为空,即发送开始的字节位置是否和结束位置一致。
 45              *在处理redis多key命令的mget,mdel,mset以及memcached多key命令
 46              *get,gets时,由于分片的原因,分片后的msg也会在客户端发送队列
 47              *中。在分片处理完要发送后,这些分片的msg应该不能被发送,为此,
 48              *对于分片的msg的pos进行了将msg的发送量置为空,这边的sendv在添
 49              *加发送内容时,忽视了这些分片。
 50              */
 51             if (mbuf_empty(mbuf)) {
 52                 continue;
 53             }
 54 
 55             mlen = mbuf_length(mbuf);
 56             if ((nsend + mlen) > limit) {
 57                 mlen = limit - nsend;
 58             }
 59 
 60             ciov = array_push(&sendv);
 61             ciov->iov_base = mbuf->pos;
 62             ciov->iov_len = mlen;
 63 
 64             nsend += mlen;
 65         }
 66 
 67         /*超过发送限制*/
 68         if (array_n(&sendv) >= NC_IOV_MAX || nsend >= limit) {
 69             break;
 70         }
 71 
 72         /*不存在发送内容*/
 73         msg = conn->send_next(ctx, conn);
 74         if (msg == NULL) {
 75             break;
 76         }
 77     }
 78 
 79     /*
 80      * (nsend == 0) is possible in redis multi-del
 81      * see PR: https://github.com/twitter/twemproxy/pull/225
 82      */
 83 
 84     /*发送函数conn_sendv*/
 85     conn->smsg = NULL;
 86     if (!TAILQ_EMPTY(&send_msgq) && nsend != 0) {
 87         n = conn_sendv(conn, &sendv, nsend);
 88     } else {
 89         n = 0;
 90     }
 91 
 92     nsent = n > 0 ? (size_t)n : 0;
 93 
 94     /* postprocess - process sent messages in send_msgq */
 95     /*
 96      *由于其发送函数底层采用writev,在发送过程中可能存在发送中断或者发送
 97      *数据没有全部发出的情况,为此需要通过实际发送的字节数nsent来确认系统
 98      *实际上发送到了哪一个msg的哪一个mbuf的哪一个字节pos,以便下一次从pos
 99      *开始发送实际的内容,以免重复发送相同的内容,导致不可见的错误。
100      */
101     for (msg = TAILQ_FIRST(&send_msgq); msg != NULL; msg = nmsg) {
102         nmsg = TAILQ_NEXT(msg, m_tqe);
103 
104         TAILQ_REMOVE(&send_msgq, msg, m_tqe);
105 
106         /*发送内容为空,进行发送完的处理*/
107         if (nsent == 0) {
108             if (msg->mlen == 0) {
109                 conn->send_done(ctx, conn, msg);
110             }
111             continue;
112         }
113 
114         /* adjust mbufs of the sent message */
115         for (mbuf = STAILQ_FIRST(&msg->mhdr); mbuf != NULL; mbuf = nbuf) {
116             nbuf = STAILQ_NEXT(mbuf, next);
117 
118             if (mbuf_empty(mbuf)) {
119                 continue;
120             }
121 
122             mlen = mbuf_length(mbuf);
123             if (nsent < mlen) {
124                 /* mbuf was sent partially; process remaining bytes later */
125                 /*此处确认了实际上发送到了哪一个msg的哪一个mbuf的哪一个字节pos*/
126                 mbuf->pos += nsent;
127                 ASSERT(mbuf->pos < mbuf->last);
128                 nsent = 0;
129                 break;
130             }
131 
132             /* mbuf was sent completely; mark it empty */
133             mbuf->pos = mbuf->last;
134             nsent -= mlen;
135         }
136 
137         /* message has been sent completely, finalize it */
138         if (mbuf == NULL) {
139             conn->send_done(ctx, conn, msg);
140         }
141     }
142 
143     ASSERT(TAILQ_EMPTY(&send_msgq));
144 
145     if (n >= 0) {
146         return NC_OK;
147     }
148 
149     return (n == NC_EAGAIN) ? NC_OK : NC_ERROR;
150 }

 发送函数conn_sendv

 writev作为一个高效的网络io,它的正确用法一直是个问题,这里给出了twemproxy的作者给出了自己正确的注解。对于其的异常处理值得借鉴

 1 ssize_t
 2 conn_sendv(struct conn *conn, struct array *sendv, size_t nsend)
 3 {
 4     ssize_t n;
 5 
 6     ASSERT(array_n(sendv) > 0);
 7     ASSERT(nsend != 0);
 8     ASSERT(conn->send_ready);
 9 
10     for (;;) {
11         /*这里的nc_writev就是writev*/
12         n = nc_writev(conn->sd, sendv->elem, sendv->nelem);
13 
14         log_debug(LOG_VERB, "sendv on sd %d %zd of %zu in %"PRIu32" buffers",
15                   conn->sd, n, nsend, sendv->nelem);
16 
17         if (n > 0) {
18             /*
19              *已发送数据长度比待发送数据长度小,说明系统发送队列已满或者不
20              *可写,此刻需要停止发送数据。
21              */
22             if (n < (ssize_t) nsend) {
23                 conn->send_ready = 0;
24             }
25             conn->send_bytes += (size_t)n;
26             return n;
27         }
28 
29         if (n == 0) {
30             log_warn("sendv on sd %d returned zero", conn->sd);
31             conn->send_ready = 0;
32             return 0;
33         }
34         /*
35          *EINTR表示由于信号中断,没发送成功任何数据,此刻需要停止发送数据。
36          *EAGAIN以及EWOULDBLOCK表示系统发送队列已满或者不可写,为此没发送
37          *成功任何数据,此刻需要停止发送数据,等待下次发送。
38          *除了上述两种错误,其他的错误为连接出现了问题需要停止发送数据并
39          *进行断链操作,conn->err非零时在程序流程中会触发断链。
40          */
41         if (errno == EINTR) {
42             log_debug(LOG_VERB, "sendv on sd %d not ready - eintr", conn->sd);
43             continue;
44         } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
45             conn->send_ready = 0;
46             log_debug(LOG_VERB, "sendv on sd %d not ready - eagain", conn->sd);
47             return NC_EAGAIN;
48         } else {
49             conn->send_ready = 0;
50             conn->err = errno;
51             log_error("sendv on sd %d failed: %s", conn->sd, strerror(errno));
52             return NC_ERROR;
53         }
54     }
55 
56     NOT_REACHED();
57 
58     return NC_ERROR;
59 }

小结

在这短短的数百行代码中,我们获知了msg_send的简单过程,最最重要的是我们知道了writev函数的发送内容处理和异常处理,特别是它如教科书般的异常处理方式使我收益良多。

 

kafkaconsumer架构设计剖析和源码全流程详解(代码片段)

Kafka作为一个分布式事件暂存和中转系统,最重要的两个功能便是,往Kafka生产数据的生产者KafkaProducer,和从Kafka拉取数据消费的消费者KafkaConsumer。今天我们主要讲解消费者,KafkaConsumer。我习惯从最朴素的问题开... 查看详情

(转)tinyhttp源码剖析

...感觉有很大收获,无论是unix的编程,还是GET/POST的Web处理流程,都清晰了不少。废话不说,开始我们的Server探索之旅。  (水平有限,如有错误之处,欢迎指正)  &nb 查看详情

深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的消息发送的全部流程和落盘原理分析指南(代码片段)

...进行使用和研究,借着这个机会,分析一下RocketMQ发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。技术范围分析的总体技术范围发送到存储,本文的主要目的是... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层源码挖掘系列」透彻剖析贯穿rocketmq的消费者端的运行核心的流程(上篇)

...消息模型是发布-订阅(Pub/Sub)是一种消息范式,消息的发送者(称为发布者、生产者、 查看详情

[转]twemproxy介绍与使用

Twemproxy是一种代理分片机制,由Twitter开源。Twemproxy作为代理,可接受来自多个程序的访问,按照路由规则,转发给后台的各个Redis服务器,再原路返回。该方案很好的解决了单个Redis实例承载能力的问题。当然,Twemproxy本身也是... 查看详情

tomcat学习笔记tomcat源码剖析

...笔记(三)Tomcat源码剖析Tomcat源码剖析tomcat启动流程tomcat请求处理流程mapper组件体系结构Tomcat源码剖析tomcat启动流程tomcat请求处理流程tomcat请求处理流程:当一个servlet请求到来的时候,tomcat是通过怎么样的机制定... 查看详情

laravel项目利用twemproxy部署redis集群的完整步骤

Twemproxy是一个代理服务器,可以通过它减少Memcached或Redis服务器所打开的连接数。下面这篇文章主要给大家介绍了关于laravel项目利用twemproxy部署redis集群的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下前... 查看详情

python监视twemproxy(代码片段)

查看详情

结合keepalivedh和haproxy组件实现twemproxy的高

插入排序的基本操作就是将一个数据插入到已经排好序的有序数据中,从而得到一个新的、http://www.ukmtey.com/个数加一的有序数据,算法适用于少量数据的排序,时间复杂度为O(n^2)。是稳定的排序方法。相反,如果在源节点找不... 查看详情

linux-centos7源码编译安装twemproxy服务(代码片段)

1.软件简单介绍Twemproxy2.编译安装Twemproxy编译安装过程中,参考现在搜索引擎能检索到文章都比较旧了,参考部分教程总是遇到各种缺少文件导致编译出现异常问题,下面教程参考GitHub官网说明,亲测可以编译安装Twemproxy成功。2.... 查看详情

twemproxy

eshop-detail-test:  listen:127.0.0.1:1111  hash:fnv1a_64  distribution:ketama  timeout:1000  redis:true  servers:    -127.0.0.1:6379:1test-redis-01    -127.0.0.1:6380:1test-redis-02eshop-detail-test:r 查看详情

ios底层探索之runtime:消息转发(代码片段)

...已经分析了动态方法解析阶段,本次内容主要对消息发送的第三个阶段——>消息转发进行分析。动态方法解析,不处理的话,就会进入消息转发流程,那么消息转发流程会调用哪些方法呢?偶然间,在... 查看详情

kafka控制器选举流程剖析(代码片段)

...大家剖析一下Kafka的控制器,了解一下Kafka控制器的选举流程。2.内容Kafka控制器,其实就是一个Kafka系统的Broker。它除了具有一般Broker的功能之外,还具有选举主题分区Leader节点的功能。在启动Kafka系统时,其中一个Broker会被选举... 查看详情

精华推荐|深入浅出sentinel原理及实战「原理探索专题」完整剖析alibaba微服务架构体系之轻量级高可用流量控制组件sentinel

Sentinel是什么?不要概念混淆啊!注意:本Sentinel与Redis服务Sentinel是两回事,压根不是一个概念,请大家不要混肴。Alibaba的SentinelSentinel是由阿里巴巴中间件团队开发的开源项目,是一种面向分布式微服务... 查看详情

oracle学习笔记图解深入剖析一个事务的操作流程

Oracle学习笔记图解深入剖析一个事务的操作流程这节课讲一下一个事务的操作流程内容有点难度先简单的看一下一)事务ID当一个事务开始以后在oracle数据库里面针对这个事务oracle会给它分配一个事务ID就是编号这个东西简单... 查看详情

redis集群方案之twemproxy+haproxy+keepalived+sentinel+主从复制(待实践)

首先说明一下,Twemproxy+HAProxy+Keepalived+Sentinel+主从复制-这里提到的技术不一定全部都用上,但是全部用上之后可以达到高可用。主从复制:实现数据一式多份的保障。Sentinel哨兵模式:实现主从节点的切换,比如主节点挂了之后... 查看详情

flask源码流程剖析

在此之前需要先知道类和方法,个人总结如下: 1.对象是类创建,创建对象时候类的__init__方法自动执行,对象()执行类的 __call__ 方法 2.类是type创建,创建类时候type的__init__方法自动执行,类() 执行type的 _... 查看详情

线程池处理用户请求的流程剖析

针对thrift多线程池的技术,存在如下几点的不理解1线程在空闲的时候是如何进行等待事件请求的2pendingTaskCountMax_参数的具体含义是什么3在当前的解决方案中采用TThreadPoolServer,而没有采用TNonblockingServer参考http://blog.csdn.net/j8daxue/ar... 查看详情