0
点赞
收藏
分享

微信扫一扫

Python使用multiprocessing实现一个最简单的分布式作业调度系统


介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。

想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。

实现

Job

首先创建一个Job类,为了测试简单,只包含一个job id属性,将来可以封装一些作业状态,作业命令,执行用户等属性。

job.py


#!/usr/bin/env python
2	# -*- coding: utf-8 -*-
3	 
4	class Job:
5	    def __init__(self, job_id):
6	        self.job_id = job_id


Master

Master用来派发作业和显示运行完成的作业信息

master.py

#!/usr/bin/env python
02	# -*- coding: utf-8 -*-
03	 
04	from Queue import Queue
05	from multiprocessing.managers import BaseManager
06	from job import Job
07	 
08	 
09	class Master:
10	 
11	    def __init__(self):
12	        # 派发出去的作业队列
13	        self.dispatched_job_queue = Queue()
14	        # 完成的作业队列
15	        self.finished_job_queue = Queue()
16	 
17	    def get_dispatched_job_queue(self):
18	        return self.dispatched_job_queue
19	 
20	    def get_finished_job_queue(self):
21	        return self.finished_job_queue
22	 
23	    def start(self):
24	        # 把派发作业队列和完成作业队列注册到网络上
25	        BaseManager.register('get_dispatched_job_queue',callable=self.get_dispatched_job_queue)
26	        BaseManager.register('get_finished_job_queue',callable=self.get_finished_job_queue)
27	 
28	        # 监听端口和启动服务
29	        manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
30	        manager.start()
31	 
32	        # 使用上面注册的方法获取队列
33	        dispatched_jobs = manager.get_dispatched_job_queue()
34	        finished_jobs = manager.get_finished_job_queue()
35	 
36	        # 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业
37	        job_id = 0
38	        while True:
39	            for i in range(0, 10):
40	                job_id = job_id + 1
41	                job = Job(job_id)
42	                print('Dispatch job: %s' % job.job_id)
43	                dispatched_jobs.put(job)
44	 
45	            while not dispatched_jobs.empty():
46	                job = finished_jobs.get(60)
47	                print('Finished Job: %s' % job.job_id)
48	 
49	        manager.shutdown()
50	 
51	if __name__ == "__main__":
52	    master = Master()
53	    master.start()

Slave

Slave用来运行master派发的作业并将结果返回

slave.py


#!/usr/bin/env python
02	# -*- coding: utf-8 -*-
03	 
04	import time
05	from Queue import Queue
06	from multiprocessing.managers import BaseManager
07	from job import Job
08	 
09	 
10	class Slave:
11	 
12	    def __init__(self):
13	        # 派发出去的作业队列
14	        self.dispatched_job_queue = Queue()
15	        # 完成的作业队列
16	        self.finished_job_queue = Queue()
17	 
18	    def start(self):
19	        # 把派发作业队列和完成作业队列注册到网络上
20	        BaseManager.register('get_dispatched_job_queue')
21	        BaseManager.register('get_finished_job_queue')
22	 
23	        # 连接master
24	        server = '127.0.0.1'
25	        print('Connect to server %s...' % server)
26	        manager = BaseManager(address=(server, 8888), authkey='jobs')
27	        manager.connect()
28	 
29	        # 使用上面注册的方法获取队列
30	        dispatched_jobs = manager.get_dispatched_job_queue()
31	        finished_jobs = manager.get_finished_job_queue()
32	 
33	        # 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业
34	        while True:
35	            job = dispatched_jobs.get(timeout=1)
36	            print('Run job: %s ' % job.job_id)
37	            time.sleep(1)
38	            finished_jobs.put(job)
39	 
40	if __name__ == "__main__":
41	    slave = Slave()
42	    slave.start()


测试

分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下

master


$ python master.py
02	Dispatch job: 1
03	Dispatch job: 2
04	Dispatch job: 3
05	Dispatch job: 4
06	Dispatch job: 5
07	Dispatch job: 6
08	Dispatch job: 7
09	Dispatch job: 8
10	Dispatch job: 9
11	Dispatch job: 10
12	Finished Job: 1
13	Finished Job: 2
14	Finished Job: 3
15	Finished Job: 4
16	Finished Job: 5
17	Finished Job: 6
18	Finished Job: 7
19	Finished Job: 8
20	Finished Job: 9
21	Dispatch job: 11
22	Dispatch job: 12
23	Dispatch job: 13
24	Dispatch job: 14
25	Dispatch job: 15
26	Dispatch job: 16
27	Dispatch job: 17
28	Dispatch job: 18
29	Dispatch job: 19
30	Dispatch job: 20
31	Finished Job: 10
32	Finished Job: 11
33	Finished Job: 12
34	Finished Job: 13
35	Finished Job: 14
36	Finished Job: 15
37	Finished Job: 16
38	Finished Job: 17
39	Finished Job: 18
40	Dispatch job: 21
41	Dispatch job: 22
42	Dispatch job: 23
43	Dispatch job: 24
44	Dispatch job: 25
45	Dispatch job: 26
46	Dispatch job: 27
47	Dispatch job: 28
48	Dispatch job: 29
49	Dispatch job: 30
————————————————


slave1

	$ python slave.py
02	Connect to server 127.0.0.1...
03	Run job: 1
04	Run job: 2
05	Run job: 3
06	Run job: 5
07	Run job: 7
08	Run job: 9
09	Run job: 11
10	Run job: 13
11	Run job: 15
12	Run job: 17
13	Run job: 19
14	Run job: 21
15	Run job: 23

slave2

$ python slave.py
02	Connect to server 127.0.0.1...
03	Run job: 4
04	Run job: 6
05	Run job: 8
06	Run job: 10
07	Run job: 12
08	Run job: 14
09	Run job: 16
10	Run job: 18
11	Run job: 20
12	Run job: 22
13	Run job: 24


举报

相关推荐

0 条评论