django(64)频率认证源码分析与自定义频率认证(代码片段)

修炼之路 修炼之路     2022-11-29     558

关键词:

前言

有时候我们发送手机验证码,会发现1分钟只能发送1次,这是做了频率限制,限制的时间次数,都由开发者自己决定
 

频率认证源码分析

def check_throttles(self, request):
    """
    检查是否应限制请求。如果请求受到限制,则引发适当的异常。
    """
    throttle_durations = []
    # 1.遍历配置的频率认证类,初始化一个个频率认证类对象(会调用频率认证类的__init__()方法)
    # 2.频率认证类对象调用allow_request()方法,频率是否限次(没有限次可访问,限次不可访问)
    # 3.频率认证类限次后,调用wait方法,获取还需多长时间可以进行下一次访问
    for throttle in self.get_throttles():
        if not throttle.allow_request(request, self):
            throttle_durations.append(throttle.wait())

    if throttle_durations:
        # Filter out `None` values which may happen in case of config / rate
        # changes, see #1438
        durations = [
            duration for duration in throttle_durations
            if duration is not None
        ]

        duration = max(durations, default=None)
        self.throttled(request, duration)

 

get_throttles()

我们首先来查看get_throttles()源码

def get_throttles(self):
    """
    实例化并返回此视图使用的节流阀列表。
    """
    return [throttle() for throttle in self.throttle_classes]

然后点击throttle_classes,跳转到APIView后查看源码

throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES

接着我们去settings.py文件中查看,发现\'DEFAULT_THROTTLE_CLASSES\': [],默认是一个空列表,那么我们就知道了for throttle in self.get_throttles()其实是去遍历列表中配置的频率认证,至于列表中需要填写什么,我们后续再看
 

allow_request

接下来我们查看allow_request方法,它是drf中的throtting.py文件中BaseThrottle类中的方法,我们查看下BaseThrottle源码

class BaseThrottle:
    """
    Rate throttling of requests.
    """

    def allow_request(self, request, view):
        """
        如果应该允许请求,则返回 `True`,否则返回 `False`。
        """
        raise NotImplementedError(\'.allow_request() must be overridden\')

    def get_ident(self, request):
        """
        Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
        if present and number of proxies is > 0. If not use all of
        HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
        """
        xff = request.META.get(\'HTTP_X_FORWARDED_FOR\')
        remote_addr = request.META.get(\'REMOTE_ADDR\')
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
            addrs = xff.split(\',\')
            client_addr = addrs[-min(num_proxies, len(addrs))]
            return client_addr.strip()

        return \'\'.join(xff.split()) if xff else remote_addr

    def wait(self):
        """
        返回推荐的在下一个请求之前等待的秒数
        """
        return None

可以看到BaseThrottle类下有3个方法

  • allow_request:如果需要继承该类,必须重写此方法
  • get_ident:获取身份
  • wait:返回等待的秒数
     

SimpleRateThrottle

throtting中有个SimpleRateThrottle继承自BaseThrottle,我们大多数情况下都会自定义SimpleRateThrottle类,让我们查看下源码,看他干了哪些事情

class SimpleRateThrottle(BaseThrottle):
    """
    一个简单的缓存实现,只需要提供get_cache_key方法即可

    速率(requests / seconds)由 View 类上的 `rate` 属性设置。该属性是“number_of_requests/period”形式的字符串。

    period应该是以下之一:(\'s\', \'sec\', \'m\', \'min\', \'h\', \'hour\', \'d\', \'day\')

    用于限制的先前请求信息存储在缓存中
    """
    cache = default_cache
    timer = time.time
    cache_format = \'throttle_%(scope)s_%(ident)s\'
    scope = None
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
    
    def __init__(self):
        if not getattr(self, \'rate\', None):
            self.rate = self.get_rate()
        self.num_requests, self.duration = self.parse_rate(self.rate)
    
    def get_cache_key(self, request, view):

    def get_rate(self):

    def parse_rate(self, rate):

    def allow_request(self, request, view):

    def throttle_success(self):

    def throttle_failure(self):

    def wait(self):

我们可以看到SimpleRateThrottle有5个属性

  • cache:默认的django中的缓存
  • timer:当前时间
  • cache_format:缓存的格式throttle_%(scope)s_%(ident)s
  • scope:范围
  • THROTTLE_RATES:默认的频率
     

除了属性,还有8个方法,我们依次查看源码

init

def __init__(self):
    if not getattr(self, \'rate\', None):
        self.rate = self.get_rate()
    self.num_requests, self.duration = self.parse_rate(self.rate)

代码讲解:如果没有获取到rate属性,那么rate属性就从get_rate()方法中获取,拿到后,从parse_rate方法中解析出一个元组,包含2个元素num_requestsduration

  • num_request:请求次数
  • duration:持续时间
     

get_rate

既然上面用到了此方法,我们就来看看

def get_rate(self):
    """
    确定允许的请求速率用字符串表示形式。
    """
    if not getattr(self, \'scope\', None):
        msg = ("You must set either `.scope` or `.rate` for \'%s\' throttle" %
               self.__class__.__name__)
        raise ImproperlyConfigured(msg)

    try:
        return self.THROTTLE_RATES[self.scope]
    except KeyError:
        msg = "No default throttle rate set for \'%s\' scope" % self.scope
        raise ImproperlyConfigured(msg)

代码讲解:如果没有获取到scope属性,会抛出异常信息,如果有scope就从THROTTLE_RATES[self.scope]中返回它,THROTTLE_RATES默认值如下:

\'DEFAULT_THROTTLE_RATES\': 
        \'user\': None,
        \'anon\': None,
    ,

所以get_rate方法返回的是THROTTLE_RATESkeyscope所对应的值,scope属性我们可以自定义的时候随意设置,如果我们自定义scopeuser,那么get_rate方法返回的就是None,所以self.rate也就为None
 

parse_rate

获取到rate,用此方法解析

def parse_rate(self, rate):
    """
    提供请求速率字符串,返回一个二元组
    允许请求的次数, 以秒为单位的时间段
    """
    if rate is None:
        return (None, None)
    num, period = rate.split(\'/\')
    num_requests = int(num)
    duration = \'s\': 1, \'m\': 60, \'h\': 3600, \'d\': 86400[period[0]]
    return (num_requests, duration)

代码讲解:如果rateNone,那么就返回(None, None),如果不为None,会把rate/分割,从这里我们就知道了rate的字符串的形式就是num/period,比如3/min,最终会把他分割,然后返回一个元组

  • num_requests:请求的次数
  • duration:取period中的下标为0的,然后从字典中取出对应的key的值,比如min,第一个开头字母为m,最后从字典中取m的值,就是60

所以示例3/min代表的就是1分钟可以访问3次
 

get_cache_key

def get_cache_key(self, request, view):
    """
    应该返回可用于限制的唯一cache-key。必须被覆盖。

    如果不限制请求,则可能返回“None”。
    """
    raise NotImplementedError(\'.get_cache_key() must be overridden\')

这个方法很简单,就是获取唯一的缓存key,如果请求不做限制,则返回None
 

allow_request

由于父类BaseThrottleallow_request方法没有实现具体的逻辑,所以SimpleRateThrottle中实现了具体的细节

def allow_request(self, request, view):
    """
    如果请求应该被节流,那么实行检查以便查看

    成功时调用`throttle_success`.
    失败时调用`throttle_failure`.
    """
    if self.rate is None:
        return True

    self.key = self.get_cache_key(request, view)
    if self.key is None:
        return True

    self.history = self.cache.get(self.key, [])
    self.now = self.timer()

    # 从历史记录中删除现在已经超过节流持续时间的任何请求
    while self.history and self.history[-1] <= self.now - self.duration:
        self.history.pop()
    if len(self.history) >= self.num_requests:
        return self.throttle_failure()
    return self.throttle_success()

代码讲解:如果rateNone就返回True,代表允许请求,如果key也为None则返回True,代表允许请求,如果ratekey都有值,history就从缓存中获取key所对应的列表,now代表当前时间。如果history有值,并且列表history的最后一个元素≤当前时间-持续时间,那么history列表就会删除这个元素,如果列表长度≥请求次数,就会调用throttle_failure,如果列表长度<请求次数,则调用throttle_success

举例:如果self.now假设为晚上20:00,durationnum_requests就用之前3/min的示例,duration表示60s,num_requests表示3次,那么self.now-self.duration就代表19:59分,如果history列表中的最后一个元素的时间值≤19:59,那么就删除它,我们的需求是3/min一分钟只能访问3次,而你超过了1分钟,就没必要限制了,所以将时间从history删除,如果history列表长度≥3,一开始是空列表的时候不满足条件,会返回throttle_success,第二次访问列表长度会增加到1,但还是不满足条件,会继续调用throttle_success,第三次访问列表长度为2,仍然不满足会继续调用throttle_success,第四次访问满足条件,就会调用throttle_failure,代表不能再请求了
 

throttle_success

def throttle_success(self):
    """
    将当前请求的时间戳与键一起插入缓存中。
    """
    self.history.insert(0, self.now)
    self.cache.set(self.key, self.history, self.duration)
    return True

代码详解:将当前时间插入到history列表的头部,给缓存设置key的值为当前时间,超时时间为duration,最后返回True,代表可以访问
 

throttle_failure

def throttle_failure(self):
    """
    当对 API 的请求由于节流而失败时调用。
    """
    return False

返回False,代表请求节流失败,不允许访问
 

wait

def wait(self):
    """
    以秒为单位返回推荐的下一个请求时间。
    """
    if self.history:
        remaining_duration = self.duration - (self.now - self.history[-1])
    else:
        remaining_duration = self.duration

    available_requests = self.num_requests - len(self.history) + 1
    if available_requests <= 0:
        return None

    return remaining_duration / float(available_requests)

代码解析:如果history列表存在,remaining_duration剩余时间就等于持续时间减去(当期时间-列表最后一个元素的时间),如果self.now为晚上20:00,history的最后一个元素值为19:59:30,而持续时间duration设置为60s,那么remaining_duration就代表还剩30s就可以进行访问了,而available_requests可用请求等于(设置好的请求次数-history列表+1)
 

自定义频率认证

  1. 自定义一个继承SimpleRateThrottle类的频率类
  2. 设置一个scope类属性,属性值为任意见名知意的字符串
  3. settings配置文件中,配置drfDEFAULT_THROTTLE_RATES,格式为scope对应的字符串值:\'次数/时间\'
  4. 在自定义频率类中重写get_cache_key方法
    限制的对象返回与限制信息有关的字符串
    不限制的对象返回None
     

需求:用户访问短信验证码1分钟只能发送1次验证码
我们创建一个throttles.py文件,然后定义SMSRateThrottle类,代码如下:

from rest_framework.throttling import SimpleRateThrottle


class SMSRateThrottle(SimpleRateThrottle):
    scope = "sms"

    def get_cache_key(self, request, view):
        phone = request.query_params.get(\'phone\') or request.data.get(\'phone\')
        # 没有手机号,就不做频率限制
        if not phone:
            return None
        # 返回可以根据手机号动态变化,且不易重复的字符串,作为操作缓存的key
        return f"throttle_self.scope_phone"

settings.py文件中配置DEFAULT_THROTTLE_RATES,代码如下:

\'DEFAULT_THROTTLE_RATES\': 
        \'sms\': \'1/min\'
    ,

最后再视图函数中,局部配置自定义认证类

class TestView(APIView):
    throttle_classes = [SMSRateThrottle]

    def get(self, request, *args, **kwargs):
        return APIResponse(data_msg="get 获取验证码")

    def post(self, request, *args, **kwargs):
        return APIResponse(data_msg="post 获取验证码")

具体测试细节过程就不再描述了,这里只讲述结果,当我们使用get或者post请求时,携带请求参数phone第一次发送请求,请求成功,第二次就会出现以下提示


    "detail": "请求超过了限速。 Expected available in 58 seconds."

58 seconds代表还剩58秒可以再次访问,至于58s是怎么算出来的,就是SimpleRateThrottle类中的wait方法实现的

drf认证权限频率(代码片段)

...定义权限三限流Throttling使用实列可选限流类实例自定义频率类一认证Authentication认证组件:校验用户-游客、合法用户、非法用户游客:代表校验通过,直接进入下一步校验(权限校验)合法用户:代表校验通过,将用户存储在re... 查看详情

三大认证源码分析(代码片段)

目录认证组件权限组件频率组件认证组件通过dispath方法,进入三大认证:首先看一下请求模块,进入request进入get_authenticators方法可以知道,请求模块二次封装request对象,包含解析模块,还将认证类们的对象存储在请求对象中。... 查看详情

drf-jwt认证组件权限组件频率组件的使用(代码片段)

目录drf-jwt认证组件、权限组件、频率组件的使用认证组件权限组件频率组件drf-jwt签发token源码分析自定义签发token实现多方式登录源码分析多方式登陆签发token实例频率组件自定义频率类drf-jwt认证组件、权限组件、频率组件的使... 查看详情

频率类组件-认证规图分析-jwt认证-drf-jwt插件(代码片段)

频率类源码#1)APIView的dispath方法中的self.initial(request,*args,**kwargs)点进去#2)self.check_throttles(request)进行频率认证频率组件原理分析 频率组件的核心源码分析defcheck_throttles(self,request):throttle_durations=[]#1)遍历配置的频率认证类... 查看详情

认证权限频率自定义签发token-多方式登录(代码片段)

...发token源码分析多方式登录签发tokenVIP用户认证权限例子频率组件自定义频率类三大认证流程图路由配置在应用下新建文件router.py#router.pyfromrest_framework.routersimportRoute,DynamicRoute,SimpleRouterasDRFSimpleRouterclassSimpleRouter(DRFSimpleRouter):routes... 查看详情

django自定义认证系统原理及源码分析解读

疑问Django在​​如何自定义用户登录认证系统的时候​​,大家都会里面立马说自定义一个或者多个backend,比如通过账号+密码、邮箱+密码,邮箱+验证码、手机号+短信验证码等等。然后设置在settings中配置一个​​AUTHENTICATION_BA... 查看详情

drf--频率组件jwt使用(代码片段)

频率类源码入口#1)APIView的dispath方法中的self.initial(request,*args,**kwargs)点进去#2)self.check_throttles(request)进行频率认证#频率组件核心源码分析defcheck_throttles(self,request):throttle_durations=[]#1)遍历配置的频率认证类,初始化得到一个个... 查看详情

10.22总结(代码片段)

频率类源码入口1.APIView的dispatch方法中的self.initall(request,*args,**kwargs) 点进去2.self.check_throttles(request)进行频率认证#频率组件核心源码分析defcheck_throttles(self,request):throttle_durations=[]#1.遍历配置的频率认证类,初始化得到一个个... 查看详情

认证权限频率自定义

fromrest_framework.authenticationimportBaseAuthenticationfromrest_framework.permissionsimportBasePermissionfromrest_framework.throttlingimportBaseThrottle,SimpleRateThrottlefromdjango_redisimportget_r 查看详情

三大认证之认证组件和权限组件(代码片段)

...客只读,自定义用户角色#认证通过:可以进入下一步校验(频率认证)#认证失败:抛出异常,返回403权限异常结果self.check_permissions(request)#频率组件:限制试图接口访问的频率次数-限制条件(IP,ID,唯一键),频率周期时间(s,m,h),频率的次数#没... 查看详情

djangorestframework之认证组件,权限组件,频率组件(代码片段)

...的,如果用户量足够大的话,服务器的压力也就比较大,并且django的session存到了django_session表中,不是很好操作,现在生产中使用的一个叫做token机制的方式比较多.可以使用这个机制把token存到用户信息里,当用户登录成功的时候放里... 查看详情

认证组件权限组件频率组件(代码片段)

认证组件、权限组件、频率组件一、Django权限六张表1.1、content_type表"""Course:name、type、days、price、vip_type基础免费课70中级学位课18069究极会员课360至尊会员Course:name、type、days、content_type_id基础免费课7null中级学位... 查看详情

drf权限频率(代码片段)

DRF权限频率什么是权限简单的说就是对某件事情决策的范围和程度,例如对数据的操作,普通用户只能查看,管理员用户可以增删改查权限组件源码执行APIView的方法,APIView返回View中的view函数,然后调用的dispatch方法,然后执行... 查看详情

django中间件限制用户访问频率(代码片段)

原:https://blog.csdn.net/weixin_38748717/article/details/79095399一、定义限制访问频率的中间件common/middleware.pyimporttimefromdjango.utils.deprecationimportMiddlewareMixinMAX_REQUEST_PER_SECOND=2#每秒访问次数classRequest 查看详情

django之频率组件(代码片段)

一、频率简介为了控制用户对某个url的请求 的频率,比如,一分钟以内,只能访问三次二、自定义频率类,自定义频率规则自定义的逻辑(1)取出访问者的ip(2)判断当前ip不在访问字典里,添加进去,并且直接返回True,... 查看详情

drf三大认证:认证组件-权限组件-权限六表-自定义认证组件的使用(代码片段)

...BAC):Role-Based-Access-Control;基于auth的认证规则(了解)。Django框架采用的是RBAC认证规则:通常分为:三表规则、五表规则、Django采用的是六表规则。三表:用户表、角色表、权限表五表:用户表、角色表、权限表、用户角色关... 查看详情

drf权限频率(代码片段)

...件源码我们之前说过了DRF的版本和认证~也知道了权限和频率跟版本认证都是在initial方法里初始化的~~其实我们版本,认证,权限,频率控制走的源码流程大致相同~~大家也可以在源码里看到~~我们的权限类一定要有has_permission方... 查看详情

drf权限频率(代码片段)

...件源码我们之前说过了DRF的版本和认证~也知道了权限和频率跟版本认证都是在initial方法里初始化的~~其实我们版本,认证,权限,频率控制走的源码流程大致相同~~大家也可以在源码里看到~~我们的权限类一定要有has_permission方... 查看详情