0
点赞
收藏
分享

微信扫一扫

PostgreSQL数据库高可用——AsyncExecutor类


AsyncExecutor类封装了异步线程启动结束管理的API,用于ha.py中启动线程执行操作任务,从而不阻碍主线程执行DCS监控任务。如下为其构造函数,_scheduled_action为任务名称,_finish_event为事件,

def __init__(self, cancellable, ha_wakeup):
self._cancellable = cancellable # 是否可以取消
self._ha_wakeup = ha_wakeup
self._thread_lock = RLock()
self._scheduled_action = None # 任务名称,字符串类型
self._scheduled_action_lock = RLock()
self._is_cancelled = False # 是否取消
self._finish_event = Event()
self.critical_task = CriticalTask()

Python3 线程中常用的两个模块为:​​_thread​​和threading(推荐使用)。thread 模块已被废弃。用户可以使用 threading 模块代替。所以,在 Python3 中不能再使用"thread" 模块。为了兼容性,Python3 将 thread 重命名为 “_thread”。但是Parition使用six能兼容这个问题,所以这里还是可以使用thead模块。

run_async

def run_async(self, func, args=()):
Thread(target=self.run, args=(func, args)).start()

Python 中,有关线程开发的部分被单独封装到了模块中,和线程相关的模块有以下 2 个:

  • ​_thread​​:是 Python 3 以前版本中 thread 模块的重命名,此模块仅提供了低级别的、原始的线程支持,以及一个简单的锁。功能比较有限。正如它的名字所暗示的(以 _ 开头),一般不建议使用 thread 模块;
  • threading:Python 3 之后的线程模块,提供了功能丰富的多线程支持,推荐使用。

Python 主要通过两种方式来创建线程:

  • 使用 threading 模块中 Thread 类的构造器创建线程。即直接对类 threading.Thread 进行实例化创建线程,并调用实例化对象的 start() 方法启动线程。
  • 继承 threading 模块中的 Thread 类创建线程类。即用 threading.Thread 派生出一个新的子类,将新建类实例化创建线程,并调用其 start() 方法启动线程。

Thread 类提供了如下的 init() 构造器,可以用来创建线程:​​__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)​​​ 此构造方法中,以上所有参数都是可选参数,即可以使用,也可以忽略。其中各个参数的含义如下:
group:指定所创建的线程隶属于哪个线程组(此参数尚未实现,无需调用);
target:指定所创建的线程要调度的目标方法(最常用);
args:以元组的方式,为 target 指定的方法传递参数;
kwargs:以字典的方式,为 target 指定的方法传递参数;
daemon:指定所创建的线程是否为后代线程。
下面程序演示了如何使用 Thread 类的构造方法创建一个线程:

import threading
#定义线程要调用的方法,*add可接收多个以非关键字方式传入的参数
def action(*add):
for arc in add:
#调用 getName() 方法获取当前执行该程序的线程名
print(threading.current_thread().getName() +" "+ arc)
#定义为线程方法传入的参数
my_tuple = ("http://c.biancheng.net/python/",\
"http://c.biancheng.net/shell/",\
"http://c.biancheng.net/java/")
#创建线程
thread = threading.Thread(target = action,args =my_tuple)

由此就创建好了一个线程。但是线程需要手动启动才能运行,threading 模块提供了 start() 方法用来启动线程。因此在上面程序的基础上,添加如下语句:​​thread.start()​​ 再次执行程序,其输出结果为:

Thread-1 http://c.biancheng.net/python/
Thread-1 http://c.biancheng.net/shell/
Thread-1 http://c.biancheng.net/java/

可以看到,新创建的 thread 线程(线程名为 Thread-1)执行了 action() 函数。默认情况下,主线程的名字为 MainThread,用户启动的多个线程的名字依次为 Thread-1、Thread-2、Thread-3、…、Thread-n 等。

为了使 thread 线程的作用更加明显,可以继续在上面程序的基础上添加如下代码,让主线程和新创建线程同时工作:

for i in range(5):
print(threading.current_thread().getName())

再次执行程序,其输出结果为:

MainThreadThread-1 http://c.biancheng.net/python/

MainThreadThread-1 http://c.biancheng.net/shell/

MainThreadThread-1 http://c.biancheng.net/java/

可以看到,当前程序中有 2 个线程,分别为主线程 MainThread 和子线程 Thread-1,它们以并发方式执行,即 Thread-1 执行一段时间,然后 MainThread 执行一段时间。通过轮流获得 CPU 执行一段时间的方式,程序的执行在多个线程之间切换,从而给用户一种错觉,即多个线程似乎同时在执行。如果程序中不显式创建任何线程,则所有程序的执行,都将由主线程 MainThread 完成,程序就只能按照顺序依次执行。

​​http://c.biancheng.net/view/2603.html​​

run

run_async函数会创建线程并运行run成员函数,其参数为(func,args)。run成员函数其实就是包装函数,其主要就是运行func函数。

def run(self, func, args=()):
wakeup = False
try:
with self:
if self._is_cancelled: # 被主动取消
return
self._finish_event.clear()
self._cancellable.reset_is_cancelled()
# if the func returned something (not None) - wake up main HA loop
wakeup = func(*args) if args else func() # 运行主要函数,返回是否需要唤醒HA主流程
return wakeup
except Exception:
logger.exception('Exception during execution of long running task %s', self.scheduled_action)
finally:
with self:
self.reset_scheduled_action()
self._finish_event.set()
with self.critical_task:
self.critical_task.reset()
if wakeup is not None:
self._ha_wakeup()

try_run_async

try_run_async函数输入action任务名称、func运行函数,args参数。先将任务名称输入schedule函数判定任务是否已经注册运行,如果已经注册运行​​self._scheduled_action is not None​​​,则直接返回任务名;否则,将​​self._scheduled_action​​​设置为action任务名称,初始化​​self._is_cancelled​​​为False,设定​​_finish_event​​事件。

def try_run_async(self, action, func, args=()):
prev = self.schedule(action)
if prev is None:
return self.run_async(func, args)
return 'Failed to run {0}, {1} is already in progress'.format(action, prev)
def schedule(self, action):
with self._scheduled_action_lock:
if self._scheduled_action is not None:
return self._scheduled_action
self._scheduled_action = action
self._is_cancelled = False
self._finish_event.set()
return None

cancel

cancel函数首先获取_scheduled_action_lock,设置_is_cancelled为True,表明已经取消,调用CancellableExecutor的cancel函数,等待_finish_event事件。

def cancel(self):
with self:
with self._scheduled_action_lock:
if self._scheduled_action is None:
return
logger.warning('Cancelling long running task %s', self._scheduled_action)
self._is_cancelled = True
self._cancellable.cancel()
self._finish_event.wait()
with self:
self.reset_scheduled_action()

with语句在求出这个上下文管理器对象之后,自动执行进入这个对象的​​__enter__​​​方法,with结束后,自动调用​​__exit__​​​中定制的自动释放资源的机制完成清理工作,无须手动干预。详细内容参见​​Python入门之——with 类​​。AsyncExecutor类实现了with语句需要的enter和exit函数。

class AsyncExecutor(object):
def __enter__(self):
self._thread_lock.acquire() # _thread_lock是在构造函数中初始化的RLock锁,进入时获取
def __exit__(self, *args):
self._thread_lock.release() # 退出时释放


举报

相关推荐

0 条评论