0
点赞
收藏
分享

微信扫一扫

drf的认证,权限,限流,版本组件

认证组件

utils.auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed


class MyAuthentication(BaseAuthentication):
    def authenticate(self, request):
        raise AuthenticationFailed("认证失败了")
        

#settings.py
"""
进行drf的配置
"""
REST_FRAMEWORK = {
    #认证代码的路径
    "DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
}

#views.py
from utils.auth import MyAuthentication


class User(APIView):
    #引用一下
    authentication_classes = [MyAuthentication]
    def get(self, request):
        # 得校验一下
        token = request.query_params.get("token")
        if not token:
            return Response({"status": False, "data": "请提供token"})
        user_object = models.UserInfo.objects.filter(token=token).first()
        if not user_object:
            return Response({"status": False, "data": "非法token"})

        return Response({"status": True, "data": [1, 2, 3]})

效果:

drf的认证,权限,限流,版本组件_User

正儿八经的用法

#views.py
class User(APIView):
    def get(self, request):
        print(request.user)
        print(request.auth)

        return Response({"status": True, "data": [1, 2, 3]})

    
#auth.py
from app01 import models
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed



class MyAuthentication(BaseAuthentication):
    def authenticate(self, request):
        """
        Authenticate the request and return a two-tuple of (user, token).
        """
        token = request.query_params.get("token")
        if not token:
            raise AuthenticationFailed({"status": 0, "msg": "认证失败"})

        user_object = models.UserInfo.objects.filter(token=token).first()
        if not user_object:
            raise AuthenticationFailed({"status": 0, "msg": "token无效"})

        return user_object, token  # request.user   request.auth

drf的认证,权限,限流,版本组件_User_02

这里因为在settings.py中写了认证代码的路径,所以当不需要认证时:
 authentication_classes = []
 比如你的登录,注册功能。

多个认证类

 authentication_classes = [class_a, class_b, class_c]
 #如果a成功了就会终止,失败了就继续执行b。依次执行,

#如果都失败了,可以自定义
REST_FRAMEWORK = {
    #认证代码的路径
    "DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
    "UNAUTHENTICATED_USER": None,
    "UNAUTHENTICATED_TOKEN": None,
}

#也可以是函数:
REST_FRAMEWORK = {
    #认证代码的路径
    "DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
    "UNAUTHENTICATED_USER": lambda:"123",
    "UNAUTHENTICATED_TOKEN": lambda:"qwe",
}

理论依据:

drf的认证,权限,限流,版本组件_User_03

简单的流程分析

class APISettings:
        def __getattr__(self, attr):
        if attr not in self.defaults:
            raise AttributeError("Invalid API setting: '%s'" % attr)
        if attr in self.import_strings:
            #读取配置文件
            val = perform_import(val, attr)        
		self._cached_attrs.add(attr)
        setattr(self, attr, val)
        return val


class APIView(View):
    #找到api_settings,其实相当于从settings.py中找
    authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    def initialize_request(self, request, *args, **kwargs):
        parser_context = self.get_parser_context(request)
        return Request(
            #就是认证类的列表
            authenticators=self.get_authenticators(),
        )    
    
    def initial(self, request, *args, **kwargs):
        self.perform_authentication(request)
        self.check_permissions(request)
        self.check_throttles(request)    
        
        
    def dispatch(self, request, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        request = self.initialize_request(request, *args, **kwargs)
        self.request = request
        self.headers = self.default_response_headers  # deprecate?

        try:
            self.initial(request, *args, **kwargs)

            # 找到对应的方法
            if request.method.lower() in self.http_method_names:
                handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
            else:
                handler = self.http_method_not_allowed

            response = handler(request, *args, **kwargs)

        except Exception as exc:
            response = self.handle_exception(exc)

        self.response = self.finalize_response(request, response, *args, **kwargs)
        return self.response

class User(APIView):
    authentication_classes = [MyAuthentication]
    def get(self, request):
        return Response({"status": True, "data": [1, 2, 3]})



class Request:
    def __init__(self, request, parsers=None, authenticators=None,
                 negotiator=None, parser_context=None):
        assert isinstance(request, HttpRequest), (
            'The `request` argument must be an instance of '
            '`django.http.HttpRequest`, not `{}.{}`.'
            .format(request.__class__.__module__, request.__class__.__name__)
        )
        self._request = request
        self.parsers = parsers or ()
        self.authenticators = authenticators or ()
        self.negotiator = negotiator or self._default_negotiator()
        self.parser_context = parser_context
        self._data = Empty
        self._files = Empty
        self._full_data = Empty
        self._content_type = Empty
        self._stream = Empty
     @property
        def user(self):
            if not hasattr(self, '_user'):
                with wrap_attributeerrors():
                    self._authenticate()
            return self._user

        @user.setter
        def user(self, value):
            self._user = value
            self._request.user = value

        @property
        def auth(self):         
            if not hasattr(self, '_auth'):
                with wrap_attributeerrors():
                    self._authenticate()
            return self._auth

        @auth.setter
        def auth(self, value):          
            self._auth = value
            self._request.auth = value

            
            
#user调用
 def _authenticate(self):
        for authenticator in self.authenticators:
            try:
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                self._not_authenticated()
                raise

            if user_auth_tuple is not None:
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple
                return

        self._not_authenticated()
        
        

def initial(self, request, *args, **kwargs):

    self.perform_authentication(request)
    self.check_permissions(request)
    self.check_throttles(request)



def dispatch(self, request, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        request = self.initialize_request(request, *args, **kwargs)
        self.request = request
        self.headers = self.default_response_headers  # deprecate?

        try:
            self.initial(request, *args, **kwargs)

            # Get the appropriate handler method
            if request.method.lower() in self.http_method_names:
                handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
            else:
                handler = self.http_method_not_allowed

            response = handler(request, *args, **kwargs)

        except Exception as exc:
            response = self.handle_exception(exc)

        self.response = self.finalize_response(request, response, *args, **kwargs)
        return self.response

权限组件

from rest_framework.permissions import BasePermission

#在视图函数之前应用
class MyPerMission(BasePermission):
    def has_permission(self, request, view):
        return True

#添加配置,或者导包
REST_FRAMEWORK = {
    # 认证代码的路径
    "DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuthentication", ],
    "UNAUTHENTICATED_USER": None,
    "UNAUTHENTICATED_TOKEN": None,
    "DEFAULT_PERMISSION_CLASSES": ["utils.permission.MyPermission", ],
}

重新写表

from django.db import models


class UserInfo(models.Model):
    username = models.CharField(verbose_name="用户名", max_length=32)
    password = models.CharField(verbose_name="密码", max_length=32)
    level = models.IntegerField(verbose_name="级别", choices=[("1", "管理员"), ("2", "用户")], default=1)
    token = models.CharField(verbose_name="token", max_length=32, null=True, blank=True)

手动添加普通用户

drf的认证,权限,限流,版本组件_REST_04

这里我为了方便,把配置注释了。手动添加

drf的认证,权限,限流,版本组件_REST_05

效果:

drf的认证,权限,限流,版本组件_User_06

源码

#initial方法调用


#同时意味着我们可以自定义message,code的具体值
def permission_denied(self, request, message=None, code=None):
    if request.authenticators and not request.successful_authenticator:
        raise exceptions.NotAuthenticated()
        raise exceptions.PermissionDenied(detail=message, code=code)

def get_permissions(self):
    return [permission() for permission in self.permission_classes]



def check_permissions(self, request):
    for permission in self.get_permissions():
        if not permission.has_permission(request, self):
            self.permission_denied(
                request,
                message=getattr(permission, 'message', None),
                code=getattr(permission, 'code', None)
            )

应用场景

常见2:多个权限组件时,必须保证都具备权限才行。       3个权限组件 【必须管理员】【必须是北京用户】【年龄必须对于18】
这里是且的关系
class User1(APIVIew):
xx = [【必须管理员】]

class User2(APIVIew):
xx = [【必须管理员】, 【必须是北京用户】]

class User3(APIVIew):
xx = [【必须管理员】【必须是北京用户】【年龄必须对于18】]

总结:多个组件时且的关系

常见3:扩展源码 (多个组件的或关系)
class User(APIView):
permission_classes = [【必须管理员】, 【必须是北京用户】]

def check_permissions(self, request):
for permission in self.get_permissions():
if permission.has_permission(request, self):
return
self.permission_denied(request, message="无权访问")

限流组件

文件位置:

drf的认证,权限,限流,版本组件_限流_07

from rest_framework.throttling import BaseThrottle


class MyThrottle(BaseThrottle):
    def allow_request(self, request, view):
        return False

drf的认证,权限,限流,版本组件_限流_08

可选择的基类

drf的认证,权限,限流,版本组件_REST_09

认证限流权限

限流配置的例子:

drf的认证,权限,限流,版本组件_REST_10

throttle.py

from rest_framework.throttling import SimpleRateThrottle


class MyThrottle(SimpleRateThrottle):
    scope = "user"

    def get_cache_key(self, request, view):
        return self.cache_format % {
            'scope': self.scope,
            'ident': self.get_ident(request)
        }

    "DEFAULT_THROTTLE_CLASSES": ["utils.throttle.MyThrottle", ],
    'DEFAULT_THROTTLE_RATES': {
        'user': "5/hour",  # 3600s内访问5次
        'anon': None,
    },

drf的认证,权限,限流,版本组件_限流_11

版本

参数

"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.QueryParameterVersioning",

效果:

drf的认证,权限,限流,版本组件_REST_12

drf的认证,权限,限流,版本组件_REST_13

以路由的形式:

 path('<str:version>/user/', views.User.as_view()),
    
"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.URLPathVersioning",
    
    
class User(APIView):

    def get(self, request, *args, **kwargs):    

drf的认证,权限,限流,版本组件_限流_14

drf的认证,权限,限流,版本组件_REST_15

请求头

"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.AcceptHeaderVersioning",
    
    
 path('user/', views.User.as_view()),
    
    
class User(APIView):

    def get(self, request):
        print("请求正常执行")
        print(request.version)    

drf的认证,权限,限流,版本组件_User_16

drf的认证,权限,限流,版本组件_REST_17

源码

def determine_version(self, request, *args, **kwargs):
    if self.versioning_class is None:
        return (None, None)
    scheme = self.versioning_class()
    return (scheme.determine_version(request, *args, **kwargs), scheme)

进行判断:

drf的认证,权限,限流,版本组件_REST_18


举报

相关推荐

0 条评论