0
点赞
收藏
分享

微信扫一扫

Python多进程或者多线程环境中通过redis锁保证对公共资源操作的一致性

Redis锁可以用于保证对公共资源的操作的一致性。在并发环境中,多个线程或进程可能同时访问同一个资源,这可能会导致资源的不一致性,因此需要一种机制来保证操作的原子性和互斥性。

Redis提供了一种分布式锁的实现方式,可以在多个进程之间实现资源的互斥访问。使用Redis锁的基本思路是,在需要访问共享资源的代码段前后添加加锁和释放锁的代码。


现在通过Python Multiprocess多进程模拟多个进程同时访问同一资源,然后通过redis来实现对公共资源操作的一致性,下面是代码示例:

import redis
import time
from multiprocessing import Process

redis_conn = redis.Redis(host='10.0.0.55', port=6379, db=0)
lock_key = 'resource_lock'
lock_value = 'locked'


def acquire_lock():
    while not redis_conn.setnx(lock_key, lock_value):
        time.sleep(0.1)


def release_lock():
    redis_conn.delete(lock_key)


def update_work(opter):
    """
    :param opter:
    :return:
    """
    print("当前操作人: {}".format(opter))
    print("拉代码,git clone操作")
    time.sleep(5)
    print("打包压缩上传目标服务器执行安装替换")
    print("更新操作完毕")


def operate_resource(opter):
    acquire_lock()
    # 操作公共资源
    update_work(opter)
    release_lock()


def worker(opter):
    while True:
        operate_resource(opter)
        time.sleep(1)


if __name__ == '__main__':
    processes = []
    # 模拟操作人员,假设对update_work都有执行权限
    opters = ['张三', '李四', '王五']
    for i, opter in enumerate(opters):
        p = Process(target=worker, args=(opter,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()


执行结果:

Python多进程或者多线程环境中通过redis锁保证对公共资源操作的一致性_一致性


flask实战项目中有个需求,假设现在有多个人对更新或热更操作都有权限,第一个人点执行了,然后没有和其他人说,结果其他人也点执行了,这样就会造成抢占,如果代码中没有加锁的话,那么会造成任务被多次执行的情况。

解决示例:

@wf.before_request
def before_request():
    """
    钩子函数,wf请求前
    """
    # noinspection PyBroadException
    try:
        opt_method = str(request.url_rule).split("/")[1]
    except:
        opt_method = ""
    # noinspection PyBroadException
    try:
        r_type = str(request.url_rule).split("/")[2]
    except:
        r_type = ""
    if opt_method == "wf" and r_type == "run":
        proj_name = request.view_args["project"]
        tid = request.view_args["tid"]
        r = OptDB.qproject_byname(proj_name)
        proj_id = r.get("proj_id", None)
        t_obj = OptTasks.qTaskbytid(tid, proj_id)
        opt_type = json.loads(t_obj.info).get("opt", '')
        if opt_type == "update" or opt_type == "hotfix":
            redis_connect = redis.Redis(host=configs[code_env].REDIS_HOST, port=configs[code_env].REDIS_PORT,
                                        password=configs[code_env].REDIS_PW, db=configs[code_env].REDIS_DB)
            if not t_obj.status:
                if redis_connect.get(tid) is not None:
                    return {"success": False, "msg": "更新或热更请求频繁"}

                redis_connect.set(tid, opt_method)

before_request装饰器表明before_request函数是一个钩子函数,意思就是 注册一个函数,在每次请求之前运行,示例上的意思就是每次请求wf蓝图前运行before_request函数里内容,当我们操作类型为update或者hotfix的时候,连接redis,REDIS_HOST,REDIS_PORT, REDIS_PW, REDIS_DB这些redis连接信息是配置在config文件里的,此处的关键点就在于正式执行操作前,我会判断redis里面key为tid这个变量里面有没有值,如果有值那就直接阻塞,返回False,最后再set塞进关于tid这个key的信息


举报

相关推荐

0 条评论