0
点赞
收藏
分享

微信扫一扫

python项目,登录生成token,后面所有接口验证token,如果是fastapi搭建的项目应该怎么实现


在使用 FastAPI 框架搭建的 Python 项目中,实现登录生成 Token 并进行后续接口验证 Token 的功能相对比较简单。你可以通过使用 JSON Web Token(JWT)来实现这一功能。以下是一个简单的示例,演示了如何在 FastAPI 中实现这一功能:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
import jwt
from jwt.exceptions import PyJWTError
from datetime import datetime, timedelta

# 密码加密相关配置
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 用于生成和验证 Token 的密钥
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 模拟用户数据库
fake_users_db = {
    "user@example.com": {
        "username": "user",
        "email": "user@example.com",
        "hashed_password": "$2b$12$yvR/JwDilUSUJq3hG9Ev9uIQ5fP38/6aOe7d57sSm3Af2tRUIJzKK",  # 密码是 "password"
    }
}

app = FastAPI()

# OAuth2密码认证模式,用于验证用户身份
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 获取用户信息的函数
def get_user(email: str):
    if email in fake_users_db:
        user_dict = fake_users_db[email]
        return user_dict

# 验证用户密码的函数
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

# 验证用户信息的函数
def authenticate_user(email: str, password: str):
    user = get_user(email)
    if not user:
        return False
    if not verify_password(password, user["hashed_password"]):
        return False
    return user

# 创建 Token 的函数
def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 登录接口
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["email"]}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# 保护的接口,需要验证 Token
@app.get("/protected")
async def protected_route(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        if email is None:
            raise HTTPException(status_code=403, detail="Invalid token")
    except PyJWTError:
        raise HTTPException(status_code=403, detail="Invalid token")
    user = get_user(email)
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return {"email": user["email"], "username": user["username"]}

在这个示例中,我们使用了 FastAPI 提供的 OAuth2PasswordBearer 来处理 Token 的验证。用户登录时会提供用户名和密码,服务器会验证密码的正确性,如果验证通过,则生成 Token 并返回给用户。用户在后续请求需要验证的接口时,需要在请求的 Header 中带上 Token,服务器会解析 Token 并验证其有效性。如果 Token 有效,则允许用户访问该接口,否则返回相应的错误信息。

你可以根据自己的实际需求来调整这个示例,并且根据需要添加更多的功能,比如权限管理等。


举报

相关推荐

0 条评论