import datetime
import hashlib
from flask import (
Flask, render_template, redirect, url_for, request,
make_response, Response, current_app
)
from functools import wraps
from flask_jwt_extended import (
JWTManager,
jwt_required,
verify_jwt_in_request,
create_access_token,
create_refresh_token,
get_jwt_identity,
get_current_user,
set_access_cookies,
set_refresh_cookies,
unset_refresh_cookies, # 刷新令牌
unset_access_cookies, # 访问令牌
unset_jwt_cookies, # 同时清除 访问令牌 和 刷新令牌
)
app = Flask(__name__)
app.secret_key = "ghakjhkghkahkhgkhalkfdngkasnkglhaj".encode('utf-8')
app.config['JWT_TOKEN_LOCATION'] = ["cookies"]
jwt = JWTManager(app) # 初始化JWTManager
# 捕获令牌过期
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
return redirect(url_for('login', message="Session expired, please log in again."))
# 捕获无效的令牌
@jwt.invalid_token_loader
def invalid_token_callback(error):
return redirect(url_for('login', message="Invalid token, please log in again."))
# 捕获缺少令牌的情况
@jwt.unauthorized_loader
def missing_token_callback(error):
return redirect(url_for('login', message="Token missing, please log in."))
# 捕获刷新令牌失效
@jwt.revoked_token_loader
def revoked_token_callback(jwt_header, jwt_payload):
return redirect(url_for('login', message="Token revoked, please log in again."))
@app.route('/')
@app.route('/index', methods=["GET", "POST"])
@jwt_required(fresh=True)
def index():
# 获取当前会话中的身份信息
info = get_jwt_identity()
return render_template('index.html', info=info)
@app.route('/login', methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get('username', None)
password = request.form.get('password', None)
confirm_password = request.form.get('confirm_password', None)
# 表单验证逻辑
if not username or not password or not confirm_password:
return render_template('login.html', errors="所有字段不能为空")
if password != confirm_password:
return render_template('login.html', errors="密码不一致")
# 假设用户名和密码验证成功
if username == "root" and password == "123":
# 创建JWT token,只存储用户名
access_token = create_access_token(identity=username, fresh=True) # fresh=True 创建新鲜令牌
refresh_token = create_refresh_token(identity=username)
# 设置JWT到cookie并重定向到主页
response = redirect(url_for('index'))
set_access_cookies(
response, access_token,
# max_age=60 * 60 * 24 * 7 # 7天有效期
# max_age=datetime.timedelta(days=2)
)
# 设置刷新令牌
set_refresh_cookies(
response, refresh_token,
# max_age=60 * 60 * 24 * 7 # 7天有效期
# max_age=datetime.timedelta(days=2)
)
return response
else:
return render_template('login.html', errors="账号或密码有误")
return render_template('login.html')
@app.route('/logout', methods=["GET", "POST"])
@jwt_required(fresh=True)
def logout():
# 注销用户并删除JWT cookies
response = redirect(url_for('login'))
# unset_access_cookies(response) # 清除访问令牌
# unset_refresh_cookies(response) # 清除刷新令牌
unset_jwt_cookies(response) # 可以清除访问+刷新令牌
return response
# 刷新访问令牌,需要刷新令牌
@app.route('/refresh', methods=['GET', 'POST'])
@jwt_required(refresh=True) # 使用刷新令牌进行验证
def refresh():
current_user = get_jwt_identity() # 从刷新令牌中获取用户身份
new_access_token = create_access_token(identity=current_user, fresh=True) # 生成新的访问令牌
refresh_token = create_refresh_token(identity=current_user) # 生成新的刷新令牌
# 创建响应并设置新的访问令牌到 cookies
response = redirect(url_for('index')) # 重定向到主页
set_access_cookies(response, new_access_token) # 将新的访问令牌写入 cookies
set_refresh_cookies(response, refresh_token) # 将新的刷新令牌写入 cookies
return response
@app.route('/test')
@jwt_required(fresh=True)
def test():
return "测试成功"
if __name__ == '__main__':
app.run(debug=True)