0
点赞
收藏
分享

微信扫一扫

浅谈JWT和实战

兵部尚输 2023-05-12 阅读 36

在大多数前后端分离项目,更多的使用JWT来进行接口认证,接下来详细介绍:

一,认识JWT

1,什么是JWT

JWT(Json Web Token)是一种为了在网络应用环境间传递声明而执行的,基于JSON的开放标准,通过数字签名的方式,以JSON对象为载体,在不同的服务终端之间安全的传输信息

2,JWT的组成

JWT由头部(Header),负载(Payload),签名(Signature) 这3部分构成,其中每一部分都使用Base64编码处理。

Header:头部信息,主要包含两部分信息,类型,通常为"JWT",  算法名称,比如HSHA256,RSA等。

Payload:具体用户的信息,需要注意的是,该部分内容只经过Base64编码(相当于明文存储),所以不要在其中放置敏感信息。

Signature:签名信息,签名用于验证信息在传递过程中是否被更改。

JWT信息由3段构成,它们之间用圆点”.“连接,格式如下:

aaaa.bbbbb.ccccc

下面我使用了在线jwt解码网址JWT Token在线解析解码 - ToolTT在线工具箱解码出的图例:

浅谈JWT和实战_JWT

浅谈JWT和实战_JWT_02

3,JWT应用场景

JWT最常见的场景就是授权认证,一旦用户登录,后续每个请求都将包含JWT,系统在每次处理用户请求之前,都要先进行JWT安全校验,通过之后再进行处理


二,JWT实战

1,django基于DRF框架实现JWT认证

可以基于djangorestframework-jwt库来使用JWT进行身份验证

(1)安装

pip install djangorestframework-jwt

浅谈JWT和实战_json_03


(2)配置应用。

编写配置文件setting.py,配置如下:

浅谈JWT和实战_redis_04

浅谈JWT和实战_json_05


(3)配置路由,在项目目录的urls.py中添加如下代码:

from rest_framework_jwt.views import obtain_jwt_token
urlpatterns = [
    path('api-jwt-token-auth/', obtain_jwt_token),
]

执行结果如图:

浅谈JWT和实战_JWT_06

输入认证的账号密码后点POST,输出结果

{
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNjg0MTQ1MDcwLCJlbWFpbCI6IjExMUAxMTEuY29tIn0.vHP9_SjPcEUq2El26qU9r8AdymtUwyNq2cW3OtlXGws"
}


现在我们给接口添加权限认证

DRF内置的权限组件的配置由以下4种:

rest_frameword.permissions.AllowAny:默认用户对所有的接口都有操作权限,即不做权限限制

rest_frameword.permissions.IsAuthenticated:通过认证的用户才可以访问接口

rest_frameword.permissions.IsAdminUser:仅管理员用户才可以访问

rest_frameword.permissions.IsAuthenticatedOrReadOnly:未认证的用户只有查询的权限,经过认证的用户有增加,删除,修改,查询的权限

这里我们用rest_frameword.permissions.IsAuthenticated做全局配置

setting.py里配置DEFAULT_PERMISSION_CLASSES

浅谈JWT和实战_单点登录_07

配置后访问任意一接口会显示如下信息:

{
    "detail": "Authentication credentials were not provided."
}


接下来,我们使用JWT访问已经设置权限的接口,该如何使用呢

JWT的格式为:JWT + 两个空格 + 具体的Token,我这里使用apihost工具去执行接口调用测试

浅谈JWT和实战_redis_08

Header信息头中添加Authorization 值是JWT + 空格 + (之前调用api-jwt-token-auth得到的token值)


2,flask中使用JWT实现单点登录中的授权认证功能

单点登录整个请求流程图简单画下:

浅谈JWT和实战_JWT_09


其中我们使用JWT主要是用来ssourl+/login用户登录成功后生成token信息,第二是,提供一个token验证接口,其他系统访问每次请求如果请求头中的有Authorization中有token值的话去验证token,只有token验证成功后才能执行后面的逻辑否则将重新跳转ssourl+/login上

使用JWT应用在单点登录token生成及验证代码实现示例:

class TokenAuth:
    def __init__(self):
        env = getenv()
        # redis连接,用于操作User类.User类是专门给flask-login用的。删除User类中的数据就可以实现用户登出。
        self.user_con_redis = configs[env].user_con_redis
        # redis链接,用于存储密码版本。
        self.ver_con_redis = configs[env].ver_con_redis
        # self.logout_con_redis = configs[env].logout_con_redis

        self.logger = HGLog('HGauth.log', log_path='/home/hero/log')

    def gen_token(self, username, secret, ver):
        """
        生成认证Token
        :param ver:
        :param username: str
        :param secret: str
        :return: string
        """

        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=10800),
                'iat': datetime.datetime.utcnow(),
                'data': {
                    'username': username,
                    'ver': ver
                }
            }
            self.logger.info("生成token:" + username)
            return jwt.encode(payload, secret, algorithm='HS256')

        except Exception as e:
            self.logger.error("token生成异常 {} ".format(username) + str(e))
            return e

    def verify_token(self, token, secret):
        """
        验证Token 除了常规的校验,还要校验ver,以保证用户密码修改后强制重新登录
        """
        print('校验token')
        print(token)
        try:
            payload = jwt.decode(token, secret, algorithms=['HS256'])
            if 'data' in payload and 'username' in payload['data']:
                payload_username = payload['data']['username']
                payload_ver = payload['data']['ver']
                # 判断一下用户是否在登录状态
                if self.user_con_redis.get(payload['data']['username']):
                    pass
                else:
                    return {"code": 2005, "message": "user is logout", "result": None, "success": False}
                ver = self.get_ver(payload_username)   # 校验payload中data字典中的ver。
                if ver == payload_ver:
                    self.logger.info("校验token: " + payload['data']['username'] + ',payload_ver: ' + str(payload_ver))
                    try:  # 查询用户接口的返回数据,捕获异常,返回2005错误码。
                        q = DataQuery()
                        success, userinfo = q.queryLdapDinginfo(payload_username)
                        if not success:
                            return {{"code": 2006, "message": userinfo, "result": None, "success": False}}

                        username = payload_username
                        nickname = userinfo['nickname']
                        dept = userinfo['full_dept']

                        # 以上是获取token jwt中的用户信息,
                        # 根据用户信息需要判断用户是否被锁 todo 暂时不处理
                        # p = ssoPW()
                        # l = p.lock_status(userName)
                        # if l['success']:  # lock_status 返回True 说明账号被锁,不可登录;返回False 说明账号未被锁定,可以使用
                        #     return {"code": 2006,
                        #             "message": "Account has been locked",
                        #             "result": None,
                        #             "success": False}
                        # # 如果用户没有被锁,继续以下流程

                        if 'uid' in userinfo:
                            uid = userinfo['uid']
                        else:
                            uid = None

                        result = {
                            'username': username,  # 规范返回值 都是用小写key
                            'dept': dept,
                            'nickname': nickname,  # 规范返回值 都是用小写key
                            'uid': uid}
                        # # 是否为雇员 todu 额外功能 不处理
                        # if 'warning' in k['data'][0]:
                        #     warning = k['data'][0]['warning']
                        #     result['warning'] = warning
                        #     result['employee'] = False
                        # else:
                        #     result['employee'] = True
                        # # 是否为机器人
                        # if 'warning2' in k['data'][0]:
                        #     warning2 = k['data'][0]['warning']
                        #     result['warning2'] = warning2
                        #     result['robot'] = True
                        # else:
                        #     result['robot'] = False
                        # # 记录非员工用户
                        # if result['robot'] or not result['employee']:
                        #     self.logger.info('非员工验证'+str(result))

                        return {"code": 1001, "message": "成功",
                                "result": result,
                                "success": True}
                    except Exception as e:
                        self.logger.error(str(e))
                        return {"code": 2005,
                                "message": "Internal Server Error. Please contect SSO developer",
                                "result": None,
                                "success": False}
                    #
                    # return {"code": 1001, "message": "成功",
                    #         "result": {"userName": userName,           # 历史遗留大小写混用,保持兼容
                    #                    'username': userName,           # 规范返回值 都是用小写key
                    #                    'dept': dept,
                    #                    'displayName': displayName,     # 历史遗留大小写混用,保持兼容
                    #                    'displayname': displayName,     # 规范返回值 都是用小写key
                    #                    'uid': uid},
                    #         "success": True}
                else:  # ver 校验未通过
                    print(ver)
                    print(payload_username, payload_ver)
                    return {"code": 2004, "message": "password changed", "result": None, "success": False}
            else:
                raise jwt.InvalidTokenError
        except jwt.ExpiredSignatureError:
            # token过期
            return {"code": 2001, "message": "expired token", "result": None, "success": False}
        except jwt.InvalidTokenError:
            # token无效
            return {"code": 2003, "message": "invalid token", "result": None, "success": False}

    def get_ver(self, username):
        """
        从redis中读取中
        :param username:
        :return:
        """
        data = self.ver_con_redis.get(username)
        # 先判断是否存在username这个记录,如果没有直接返回0
        if data is None:
            return 0
        data = pickle.loads(data)
        return data["ver"]

    def set_ver(self, username, ver):
        """
        操作redis,设置ver
        :param username:
        :param ver:
        :return:
        """
        data = {"ver": ver}
        self.ver_con_redis.set(username, pickle.dumps(data))

其中gen_token就是用户通过ssourl+/login登录成功后生成token,将token存入到headers和cookie中

verify_token函数为单点登录系统对外提供调用的验证token的函数

编写验证token接口

@app.route('/attach_valid')
def attach_valid():
    """
    校验token
    :return:
    """
    token = request.headers.get('Authorization')
    SSO_SECRET = configs[env].sso_secret
    hglog.info('attach_valid' + str(token))  # token 可能为空
    if token:
        ta = TokenAuth()
        data = ta.verify_token(token, SSO_SECRET)
        print(data)
        if data['code'] == 1001:
            username = data['result']["username"]
            # if nonem_block(username=username, sip=realip):
            #     return abort(403)
            res = make_response(jsonify(data), 200)
        else:  # code非1001 认为异常 返回码202
            res = make_response(jsonify(data), 202)
        hglog.info(token + "-/attach_valid")
        return res
    else:
        data = {"code": 2003, "message": "no token", "result": None, "success": False}
        res = make_response(jsonify(data), 202)
        hglog.info(request.remote_addr + "-" + 'no_token' + "-/attach_valid")
        return res

简单的示范,如果需要详细咨询Python关于单点登录的实现的话,请联系作者博主:quietguoguo的博客_SSL,nginx,python_51CTO博客


举报

相关推荐

0 条评论