0
点赞
收藏
分享

微信扫一扫

从零开始构建报警中心:part10 接入统一登录

独兜曲 2023-12-21 阅读 37

前文使用flask-login实现了用户登入登出,在实际使用中过于简单,这里引入统一登录。关于统一登录可以看一下早前的这篇文档:一个简单的SSO统一登录设计

对于接入统一登录可以参考一下的时序图:


从零开始构建报警中心:part10 接入统一登录_flask

根据这个时序图,要接入已有的SSO,需要对现有系统进行一些修改:

  1. 未登录情况下,在login视图中添加从cookie或header中检测Authorization。并根据Authorization是否存在进行判断。
  1. 如果无Authorization,跳转至SSO进行登录
  2. 有Authorization,发送给SSO进行验证
  1. 已登录状态下,验证Authorization是否合法,如果不合法重新登陆。

相关代码如下

在本实例中

SSO的域名为sso.xxx.com,自身应用域名为t.xxx.com,因为在SSO中设定了cookie作用域为xxx.com。

其他代码逻辑,例如访问SSO的/attach/valid验证token,处理flask_login的next参数等均在时序图中有所描述。

使用到了redis存储flask-login中所使用的User对象,并在logout时从redis中移除该条数据,用以实现load_user返回None。


from flask import Flask, redirect, render_template, request, session, url_for
from flask_login import LoginManager, login_user, UserMixin, current_user, login_required, logout_user
import pickle
import requests
from redis import Redis

app = Flask(__name__, template_folder="templates")
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'  # 使用SQLite数据库,可以根据需要更改

db.init_app(app)  # 初始化绑定DB 与 Flask
login_manager = LoginManager(app)  # Setup a Flask-Login Manager
login_manager.login_view = "login"  # required_login 校验不通过,默认跳转

# 用戶登錄Redis配置
user_redis_con = Redis(
    host='127.0.0.1',  # 服务器地址
    port=6379,  # 服务器端口
    db=10
    )


class User(UserMixin):
    """
    flask-login 必要 User类
    """
    def __init__(self, username):
        self.username = username

    def __repr__(self):
        return self.username

    def get_id(self):
        return self.username


# 用户加载函数
# 使用redis存储User对象,如果Redis中没有,则认为未登录,返回None
@login_manager.user_loader
def load_user(id):
    """
    flask-login必要,重要的回调函数
    从redis中返回User类对象,或者None
    :param id:
    :return:
    """

    redis_id = user_redis_con.get(id)
    if redis_id is None:
        return None
    else:
        try:
            user = pickle.loads(redis_id)  # 使用 pickle.loads 方法 将 存储在redis中的序列化对象读出来,还原成User对象
            return user
        except Exception as E:
            return None


# 定义错误处理函数,捕获404错误
@app.errorhandler(404)
def page_not_found(error):
    # 重定向到根路由
    return redirect(url_for('index'))

# 定义根路由
@app.route('/')
@login_required
def index():
    user = current_user
    return '欢迎你, ' + user.username


@app.route('/login', methods=["GET", "POST"])
def login():
    """
    接入统一登录
    未登录状态的所有请求都会被重定向到此URL
    如果请求参数中没有token,则会重定向到sso进行登录操作。
        通过@login_required 撞过来的请求,会带着next参数,表明来源url。
        如果有next参数则将来源URL作为n参数挂在redirectUrl中,用以回跳到登录前页面。(中间跳到了SSO,所以这段有点复杂)
    如果请求参数中有token,则将token放入header Authorization中发送给sso进行验证会验证。
        验证token通过后,直接登录。否则返回提示。
    :return:
    """
    token = request.cookies.get('Authorization', None)\
        if request.cookies.get("Authorization", None) \
        else request.headers.get("Authorization", None)
    if token:
        print("token   " + token)
        # token 参数存在,向SSO请求验证。
        headers = {'Authorization': token}
        attach_valid_url = "http://sso.xxx.com/attach_valid"
        r = requests.get(attach_valid_url, headers=headers)  # 将token放入header  Authorization中发送给sso进行验证。
        data = r.json()
        if data["code"] == 1001:
            username = data["result"]["username"]
            user = User(username)
            # 保存用户信息
            user_dump = pickle.dumps(user)  # 使用 pickle.dumps 方法 将 User 对象序列化存储在redis中
            user_redis_con.set(username, user_dump)
            # 登录操作
            login_user(user)
            # 处理登录路由
            n = request.args.get('next')
            print(n)
            if n:
                return redirect(n)
            else:
                return redirect(url_for('index'))
        else:
            n = request.args.get('next')
            # @login_required 会自动添加一个next parma。如果有这个param 则给redirectUrl加上一个n参数,然后把n挂在上面。
            if n:
                return redirect("%s/login?redirectUrl=%s/login?next=%s" % (
                    'http://sso.xxx.com', 'http://t.xxx.com', n))
            else:
                return redirect("%s/login?redirectUrl=%s/login" % ('htto://sso.xxx.com', 'http://t.xxx.com'))
    else:
        print("无token")
        n = request.args.get('next')
        # @login_required 会自动添加一个next parma。如果有这个param 则给redirectUrl加上一个n参数,然后把n挂在上面。
        if n:
            return redirect("%s/login?redirectUrl=%s/login?next=%s" % (
                'http://sso.xxx.com', 'http://t.xxx.com', n))
        else:
            print("无next")
            return redirect("%s/login?redirectUrl=%s/login" % ('http://sso.xxx.com', 'http://t.xxx.com'))


@app.route('/logout')
@login_required
def logout():
    """
    退出登录
    执行logout_user()进行登出操作(flask-login)
    并删除redis缓存中的User类对象信息
    :return:
    """
    username = current_user.username
    # --start-- 本地系统logout
    logout_user()
    user_redis_con.delete(username)
    # --end--
    # # 重定向SSO进行logout操作
    return redirect("%s/logout" % 'http://sso.xxx.com')


if __name__ == '__main__':
    app.run(debug=True, port=80)


举报

相关推荐

0 条评论