0. 准备环境
* 1. 新建一个项目, 不创建模板层

* 2. 将rest_framework注册到app应用列表中.
INSTALLED_APPS = [
...
'rest_framework'
]
* 3. 自定义正常响应类与异常响应类
from rest_framework.response import Response
from rest_framework import status
class NormalResponse(Response):
def __init__(self, code=200, msg='访问成功!', data=None, status=status.HTTP_200_OK, **kwargs):
back_info = {'code': code, 'msg': msg}
if data:
back_info.update(data=data)
back_info.update(kwargs)
super().__init__(data=back_info, status=status)
from rest_framework.views import exception_handler
def exception_response(exc, context):
response = exception_handler(exc, context)
if not response:
if isinstance(exc, Exception):
error_info = f'遇到异常>>>:{exc}'
return NormalResponse(500, '访问失败', error_info, status.HTTP_500_INTERNAL_SERVER_ERROR)
error_info = response.data.get('detail')
if error_info:
error_info = response.data
return NormalResponse(500, '访问失败!!', error_info, status.HTTP_500_INTERNAL_SERVER_ERROR)
项目配置文件settings.py中, 全局配置dispatch异常响应.
只能全局配置, 自定义的异常响应, 值就是一个字符串, 不能是列表['自定义的异常响应']
REST_FRAMEWORK = {
'EXCEPTION_HANDLER': 'utils.response.exception_response',
}
* 4. 建立一个测试路由(采用路由分发)
from django.conf.urls import url, include
from django.contrib import admin
from app01 import urls
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^app01/api/', include(urls))
]
from django.conf.urls import url
from app01 import views
urlpatterns = [
url(r'^test1/', views.Test1.as_view())
]
* 5. 测试视图类
from rest_framework.views import APIView
from utils.reponse import NormalResponse
class Test1(APIView):
def get(self, request):
return NormalResponse(data='Test1')
* 6. 测试
get请求: 127.0.0.1:8000/app01/api/test1/

1. 自定义频率限制
1.1 SimpleRateThrottle源码

class SimpleRateThrottle(BaseThrottle):
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
def get_cache_key(self, request, view):
raise NotImplementedError('.get_cache_key() must be overridden')
def get_rate(self):
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
def parse_rate(self, rate):
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
def allow_request(self, request, view):
if self.rate is None:
return True
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
self.history = self.cache.get(self.key, [])
self.now = self.timer()
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
def throttle_success(self):
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
def throttle_failure(self):
return False
def wait(self):
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
1.2 SimpleRateThrottle使用
限制用户请求的ip每分钟只能访问三次.
请求ip在request.META中. request.META.get('REMOTE_ADDR') 获取.
* 1. 定义限制类
from rest_framework.throttling import SimpleRateThrottle
class Limit(SimpleRateThrottle):
scope = 'ip'
def get_cache_key(self, request, view):
ip_addr = request.META.get('REMOTE_ADDR')
return ip_addr
从配置文件DEFAULT_THROTTLE_RATES中根据scope得到频率配置(次数/时间单位)
类的scope数据作为频率认证类的数据值的key
* 2. 全局配置限制类
REST_FRAMEWORK = {
'EXCEPTION_HANDLER': 'utils.response.exception_response',
'DEFAULT_THROTTLE_CLASSES': ['utils.throttling.Limit',],
'DEFAULT_THROTTLE_RATES': {'ip': '3/m', },
}
* 3. 测试
1.3 自定义模块
自定义频率限制类, 需要有写两个方法:
1. allow_request(self, request, view)
如果应该允许请求,则返回True(不限次),否则返回False (限次)。
2. wait(seif) (可选)返回在下一个请求之前等待的建议秒数。
继承BaseThrottle, 该类中规范了子类的行为, 也可以不继承, 倒是类中必须有
源码中必须要执行allow_request方法.


BaseThrottle源码

代码逻辑:
0. 建立一个空字段 -->访问字典. {}
1. 从request.META.get('REMOTE_ADDR')中取出访问的ip.
2. 判断当前的ip在不在访问字典中, 若不在则以ip为键, 时间为值添加到字典中, 并且放回True, 表示第一次访问.
{'xxxip': [访问时间1, ]}
3. ip存在, 循环将列表取出来, 循环判断列表中每个值是否超过了60秒, 超过60秒的pop掉. 保证列表中只存60秒内
访问的时间信息, 退出循环.
{'xxxip': [访问时间3, 访问时间2, 访问时间1,]}
[1650897168.0505562, 1650897167.4371457, 1650897166.5577233] 时间戳
4. 统计列表所有的元素, 如果超过三次直接返回False, 访问失败
没有超过三次, 将当时时间插入到列表中, 访问成功
import time
class Limit2():
access_info = {}
ip = None
def allow_request(self, request, view):
ip = request.META.get('REMOTE_ADDR')
self.ip = ip
if ip not in self.access_info:
self.access_info[ip] = []
self.access_info[ip].append(time.time())
return True
print(self.access_info[ip])
while True:
if time.time() - self.access_info[ip][-1] > 60:
self.access_info[ip].pop()
break
print(self.access_info[ip])
if len(self.access_info[ip]) > 3:
return False
self.access_info[ip].insert(0, time.time())
return True
def wait(self):
"""
当前时间 - 列表第一次访问的时间 = 0 1 2 ↑
60 - (当前时间 - 列表第一次访问的时间 0 1 2) = 60 59 58 ↓
"""
return 60 - (time.time() - self.access_info[self.ip][-1])
2. API接口文档
rest_framework 可以自动生成接口文档, 文档以网页方式呈现.
1. 安装依赖库coreapi
2. 继承自APIView及其子类的视图.
* 1. 安装依赖库
pip install coreapi
* 2. 在总路由中设置接口文档访问路径.
from rest_framework.documentation import include_docs_urls
url('^docs/', include_docs_urls(title='站点页面标题'))
* 3. 继承APIView类及其子类
1. 继承APIView在, 在视图类的请求方法的文档字符串中写帮助信息.
class BookAPIView(APIView):
def get(...):
"""
获取所有书籍信息
"""
2. 单一方法的视图, 可以直接在视图类的文档字符串中写帮助信息.
class BookListView(ListAPIView):
"""
返回所有的图书信息
"""
3. 多方法的图书, 在视图类的文档字符串中, 为每个方法定义帮助信息.
class BookListCreateView(ListCreateAPIView):
"""
get: 返回所有图书信息
post: 新建图书
"""
4. 对于数图集ViewSet, 在视图类的文档字符串中定义, 使用action的名称区分.
class BookViewSet(ListModelMinxin, ...):
"""
list: 返回图书列表数据
retrieve: 返回图书详情数据
latest: 放回最新的图书数据
...
"""
* 4. 在配置文文件这添加core接口
AutoSchema' object has no attribute 'get_link
REST_FRAMEWORK = {
'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema'
}
* 5. 需要session的信息, 执行数据库迁移命令, 否则会报错 no such table:django_session
python manage.py makemigrations
python manaage.py migrate
* 6. 浏览器中访问 127.0.0.1:8000/docs/ 便可以看见自动生成的接口文档.

视图集的retrieve名称, 在接口文档网站叫做read
参数的Description需要在模型类或序列化器的字段中以help_text中定义
3. JWT
jwt: Json Web token
3.1 原理
1. jwt分三段式: 头.体.签名 (head.payload.sgin)
2. 头个体的可逆加密, 让服务器可以反解析出user对象, 前签名是不可逆加密, 保证整个token的安全性.
3. 头体签名三部分, 都是采用json格式的字符串进行加密, 可逆加密一般采用base64算法,
不可逆加密一般采用hash(例: md5)算法.
4. 头中的内容是基本信息: 公司信息, 项目信息. token采用的加密方式信息
{
"company": "公司信息"
...
}
5. 体中的内容是关键信息: 用户主键, 用户名, 签发时客户端信息(设备号, 地址), 过期时间
{
"uese_id": 1,
...
}
6. 签名中内容安全信息: 头的加密结果 + 体的加密结果 + 服务器不对外公开的安全码 进行md5加密
{
"head": "头的加密结果",
"payload": "体的加密结果",
"secret_key": "安全码"
}
3.2 签发/校验
签发: 根据登入请求提交的账户+密码+设备信息 签发token
用基本信息存储json字典, 采用base64算法加密得到 头字符串
用关键信息存储json字段, 采用base64算法加密得到 体字符串
用头, 体的加密字符串再加安全存储到json字典, 采用hash md5算法加密得到 签名字符串
账户密码能够根据User表得到user对象, 形成三段字段串用.拼接成token返回给前端
校验:根据客户端带token的请求反解出user对象
将token按.拆分成三端
第一段 头的加密信息, 一般不需要做任何处理
第二段 体的加密字符串, 要反解析出用户主键, 通过主键从User表中得到登入的用户, 过期时间和设备信息都是安全信息, 确保token没过期, 且是同一个设备发送的请求
再用 第一段 + 第二段 + 服务器安全密码 通过不可逆md5加密 与第三端签名字符串进程碰撞校验, 校验成功
后才能第二段校验得到user对象就是合法的登入用户
3.3 简单使用
1. 用账户/密码访问登入接口, 登入接口逻辑中调用签发token算法, 得到token, 返回给客户端, 并保存到cookie中
2. 校验token算法因该写在认证类中, 反解析出数据去数据库中校验得到user对象, 将对象放回即可,
源码中保存到requesr.user中, 全局配置给认证组件, 所有视图类请求都进行校验, 所有请求携带token访问.
* 登入接口的认证与权限局部禁用
1. 安装模块
* 1. rest_framework 有对应的jwt模块.
安装: pip install djangorestframework_jwt
2.继承内置模型表
* 2. 继承AbstractUser内置用户表表, 拓展两个字段
项目中一开始没继承AbstractUser表, 之后再继承执行生成表记录命令就会报错!
1. 新建项目

2. 解决templates路径问题, 将正常正常响应与异常响应代码复制过来. 将rest_framework注册.
修改语言/时区
INSTALLED_APPS = [
...
'rest_framework',
]
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
3. 继承AbstractUser写表模型
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
phone = models.CharField(max_length=11, verbose_name='手机号')
icon = models.ImageField(upload_to='icon')
在settings.py中,添加使用UserInfo的配置选项:
AUTH_USER_MODEL = 'app名.扩展表的表名'
AUTH_USER_MODEL = 'app01.UserInfo'
4. 新建媒体文件夹, 并设置媒体文件存放路径
MEDIA_URL = '/media/'
MEDIA_ROOT = Path(BASE_DIR, 'media')
5. 创建表的命令:
python manage.py makemigrations
python manage.py migrate
3. 创建超级用户
PS F:\synchro\Project\DRF_JWT> python manage.py createsuperuser
用户名: root
电子邮件地址: 136@qq.com
Password: zxc123456
Password (again): zxc123456
这个密码太常见了。
Bypass password validation and create user anyway? [y/N]: y

4. 测试路由
使用rest_framework_jwt提供的视图类
三个可用的视图类:
ObtainJSONWebToken, VerifyJSONWebToken, RefreshJSONWebToken -继承-> JSONWebTokenAPIView
一个基类:
JSONWebTokenAPIView -继承-> APIView

由于在模块中, 视图类执行了.as_view()方法, 得到一个函数的内存地址给变量名, 在路由中直接使用函数名即可.
obtain_jwt_token方法中有登入校验, 登入成功之后, 返回token.
from django.contrib import admin
from django.urls import path, re_path
from rest_framework_jwt.views import obtain_jwt_token
urlpatterns = [
path('admin/', admin.site.urls),
re_path(r'^login/', obtain_jwt_token)
]
5.登入
使用POST请求提交账户/密码进行登入: 127.0.0.1:8000/login

登入成功之后, 返回token.
第一次登入...
{
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InJvb3QiLCJleHAiOjE2NTA5NzM2NTYsImVtYWlsIjoiMTM2QHFxLmNvbSJ9.EcLM6P9PhEYgxHA4VmNh4zvKoU2ITIuFrsvv9ZyTMwk"
}
第二次登入...
{
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InJvb3QiLCJleHAiOjE2NTA5NzM5MjQsImVtYWlsIjoiMTM2QHFxLmNvbSJ9.caseIhIg7VWNiSRhrJOPmzc0gTJ_ilo_b4sgGwoJoKk"
}
obtain_jwt_token方法中将登入时间作为第二端的一个参数, 每次登入第二段的信息一定会表.
第二段的信息发生了变化, 第三段的签名信息一定会变.
3.4 JSONWebTokenAuthentication源码
在访问的时候携带的token值要以JWT空格开头...
如果不按这个要求不做校验, 直接可以访问...
JSONWebTokenAuthentication 继承 BaseJSONWebTokenAuthentication
BaseJSONWebTokenAuthentication有两个方法:
authenticate 认证方法
authenticate_credentials 数据token校验方法
JSONWebTokenAuthentication有主要方法:
get_jwt_value 获取token的值
class BaseJSONWebTokenAuthentication(BaseAuthentication):
def authenticate(self, request):
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return (user, jwt_value)
def authenticate_credentials(self, payload):
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User account is disabled.')
raise exceptions.AuthenticationFailed(msg)
return user
class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
www_authenticate_realm = 'api'
def get_jwt_value(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth:
if api_settings.JWT_AUTH_COOKIE:
return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
return None
if smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
return auth[1]
规定authorization携带token

内置的校验程序

设置token前缀 JWT

3.5 携带token访问
* 1. 测试路由
from django.contrib import admin
from django.urls import path, re_path
from app01 import views
from rest_framework_jwt.views import obtain_jwt_token
urlpatterns = [
path('admin/', admin.site.urls),
re_path(r'^login/', obtain_jwt_token),
re_path(r'^test1/', views.Test1.as_view())
]
* 2. 测试视图类
from rest_framework.views import APIView
from utils.response import NormalResponse
class Test1(APIView):
def get(self, request):
return NormalResponse('Test1')
* 3. 添加认证, 局部配置
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
authentication_classes = [JSONWebTokenAuthentication, ]
from rest_framework.views import APIView
from utils.response import NormalResponse
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
class Test1(APIView):
authentication_classes = [JSONWebTokenAuthentication, ]
def get(self, request):
return NormalResponse('Test1')

* 4. 添加认证, 全局配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': ['rest_framework_jwt.authentication.JSONWebTokenAuthentication',]
}
3.6 自定义JWT认证类
# 导入内置的校验程序
from rest_framework_jwt.utils import jwt_decode_handler
BaseJSONWebTokenAuthentication有两个方法:
authenticate 认证方法
authenticate_credentials 数据token校验方法
继承 BaseJSONWebTokenAuthentication, 重写authenticate认证方法,
自己获取请求头的token值, 使用jwt_decode_handler方法对值进行校验, 解析出字第二端的用户信息-->dict
获取用户数据对象authenticate_credentials
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_jwt.utils import jwt_decode_handler
class JsonWenToken(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_value = request.META.get('HTTP_AUTHORIZATION')
if not jwt_value:
raise AuthenticationFailed('没有携带token')
jwt_value = jwt_value.encode('iso-8859-1)')
try:
user = jwt_decode_handler(payload)
print(user)
return user, None
except Exception as exc:
raise AuthenticationFailed(f'token校验失败!{exc}')
