0
点赞
收藏
分享

微信扫一扫

python - 多进程


问题导读:

  1. Process
  2. Process class
  3. Lock
  4. Semaphore
  5. Event
  6. Queue
  7. Pool

解决方案:



Process



#!/usr/bin/env python
# coding=utf8
import multiprocessing
import time


def sayHello(interval):
for i in range(5):
print 'The time is {0}'.format(time.ctime())
time.sleep(interval)

if __name__=='__main__':
p = multiprocessing.Process(target=sayHello, args=(3,))
p.start()

p1 = multiprocessing.Process(target=sayHello, args=(3,))
p1.start()

print 'pid:', p.pid, ' ', p1.pid
print 'name:' ,p.name, ' ', p1.name
print 'is_alive:', p.is_alive(), ' ', p1.is_alive()


Process Class



#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

class SayHello(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval

def run(self):
for i in range(4):
print 'time is {0} {1}'.format(time.ctime(),self.pid)
time.sleep(self.interval)

if __name__=='__main__':
for i in range(5):
p = SayHello(1)
p.start()


Lock



#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time
def sayHello(lock, i):
# 获取锁
lock.acquire()
print 'start:{0}'.format(time.ctime())
try:
with open('./data.txt','a') as f:
f.write(time.ctime() + ' id:' + str(i) + '\n')
finally:
# 释放锁
lock.release()
time.sleep(1)
print 'end:{0}'.format(time.ctime())

if __name__=='__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=sayHello, args = (lock,3))
p2 = multiprocessing.Process(target=sayHello, args = (lock,4))
# 主进程结束,子线程结束
p1.daemon = True
p2.daemon = True
p1.start()
p2.start()
# 主进程会在结束之前检查是否有子线程未完成
p1.join()
p2.join()
print 'end!'


Semaphore



#!/usr/bin/env python
# coding=utf-8

import multiprocessing
import time

def sayHello(s, i):
# 获取
s.acquire()
print multiprocessing.current_process().name + ' acquire'
time.sleep(i)
print multiprocessing.current_process().name + ' release'
# 释放
s.release()
if __name__=='__main__':
# 控制共享资源的访问数量
s = multiprocessing.Semaphore(2)
for i in range(4):
p = multiprocessing.Process(target=sayHello, args=(s, i))
p.start()





Event

#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

def wait_for1(e):
print 'wait_for_1: starting'
# 等待1s
e.wait(1)
print 'wait_for_1:{0}'.format(str(e.is_set()))

def wait_for2(e):
print 'wait_for_2: starting'
# 等待5s
e.wait(5)
print 'wait_for_2:{0}'.format(str(e.is_set()))
if __name__=='__main__':
# 在进程之间传递状态
e = multiprocessing.Event()

e1 = multiprocessing.Process(name = 'p1', target = wait_for1, args = (e,))
e2 = multiprocessing.Process(name = 'p2', target = wait_for2, args=(e,))

e1.start()
e2.start()

# 等待3s 之后设置状态
time.sleep(3)
e.set()

print 'Event Set Ok!'


Queue


  1. Queue.qsize() 返回队列的大小
  2. Queue.empty() 如果队列为空,返回True,反之False
  3. Queue.full() 如果队列满了,返回True,反之False
  4. Queue.get([block[, timeout]]) 获取队列,timeout等待时间
  5. Queue.get_nowait() 相当Queue.get(False)
  6. 非阻塞 Queue.put(item) 写入队列,timeout等待时间
  7. Queue.put_nowait(item) 相当Queue.put(item, False)



#!/usr/bin/env python
# coding=utf-8
import multiprocessing
import time

def writer_proc(q):
try:
for i in range(5):
time.sleep(1)
q.put(i,timeout = 2)
print 'put:',i
except:
pass
def reader_proc(q):
try:
for i in range(5):
j = q.get(timeout = 2)
print 'get:',j
except:
pass
if __name__=='__main__':
q = multiprocessing.Queue()
writer = multiprocessing.Process(target = writer_proc, args=(q,))
writer.start()

reader = multiprocessing.Process(target = reader_proc, args=(q,))
reader.start()


Pool



# coding: utf-8
import multiprocessing
import os, time, random


def Lee():
print "\nRun task Lee-%s" % (os.getpid()) # os.getpid()获取当前的进程的ID
start = time.time()
time.sleep(random.random() * 10) # random.random()随机生成0-1之间的小数
end = time.time()
print 'Task Lee, runs %0.2f seconds.' % (end - start)


def Marlon():
print "\nRun task Marlon-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 40)
end = time.time()
print 'Task Marlon runs %0.2f seconds.' % (end - start)


def Allen():
print "\nRun task Allen-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print 'Task Allen runs %0.2f seconds.' % (end - start)


def Frank():
print "\nRun task Frank-%s" % (os.getpid())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print 'Task Frank runs %0.2f seconds.' % (end - start)


if __name__ == '__main__':
function_list = [Lee, Marlon, Allen, Frank]
print "parent process %s" % (os.getpid())

pool = multiprocessing.Pool(4)
for func in function_list:
# apply 线程阻塞,执行线程代码的时候,会阻塞主进程
pool.apply_async(func) # Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

print 'Waiting for all subprocesses done...'
pool.close()
pool.join() # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
print 'All subprocesses done.'




举报

相关推荐

0 条评论