因为自已要写一个和时间有关的方法,每过几秒钟之后要运行一个函数,但在主线程里写一个死循环来作定时器总是觉得不好。正好今天学习了一下python的多线程,可以拿来练手。写了下边的python定时器类,使用这个类你可以在你的代码中加入一个定时器。
代码如下(pytimer.py):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
import time
from Queue import
class
def
threading.Thread.__init__(self, name=t_name)
self.threadtimes = []
self.threadFunc = {}
self.lasttimes = {}
self.queue = queue
def
if newobj.func == None:
self.threadtimes.remove(newobj.secendtime)
None
None
else:
if newobj.secendtime in
self.threadFunc[str(newobj.secendtime)] = newobj.func
self.lasttimes[str(newobj.secendtime)] = int(time.time())
else:
self.threadtimes.append(newobj.secendtime)
self.threadFunc[str(newobj.secendtime)] = newobj.func
self.lasttimes[str(newobj.secendtime)] = int(time.time())
def
while(True):
if not
objtmp = self.queue.get()
self.setNewTimer(objtmp)
timetmp = int(time.time())
for tx in
if
self.lasttimes[str(tx)] = timetmp
self.threadFunc[str(tx)](timetmp)
self.condition.release()
class
def
self.secendtime = secendt
self.func = funct
class
def
self.queue = Queue()
self.cond = threading.Condition()
self.t_thread = _timerThread(str(int(time.time())),self.queue, self.cond)
self._timers = []
self._initTimer()
def
True)
self.t_thread.start()
def
return
def
objtmp = _timerObj(secendTime,Func)
self._timers.append(secendTime)
self.queue.put(objtmp)
def
None)
self._timers.remove(secendTime)
self.queue.put(objtmp)
功能测试(test.py):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
def
def
timerx = pytimer.pytimer()
2, timerCallBack)
while(True):
pass
if __name__ == '__main__':
main()
运行测试程序后输出结果:
1446334639
1446334641
1446334643
1446334645
1446334647
1446334649
1446334651
1446334653
...
运行结果与预期想要的结果相同。每两秒调用了一次定时器返回函数。
源码获取地址:
https://github.com/fengmm521/pytimer/tree/master