不妨令 run()
中为主逻辑,在 main()
中循环调用 run()
函数。并通过在 main()
中增加延迟实现定时调用的效果。
方法一:用间隔时间减去 run()
函数运行时间作为延时时间
def run():
print(time.time())
def main(distance: float):
while True:
start_time = time.time() # 当前这一次的开始时间
run()
end_time = time.time()
if end_time - start_time < distance:
time.sleep(distance - (end_time - start_time))
if __name__ == "__main__":
main(1.0)
在 run()
函数中打印进入函数的时间,结果如下:
1722653009.4056983
1722653010.4062116
1722653011.4063604
1722653012.406457
1722653013.4066129
1722653014.407105
1722653015.4072695
1722653016.4078603
1722653017.4081
1722653018.4087732
1722653019.40917
可以看到,每一次的延时时间均略微大于 1 秒。经过 10 次延时后,共计超出了 0.0035 秒,平均每次额外延时了 0.003 秒。每次延时的时间虽然不长,但是如果对每秒一次的精确性要求较高,或运行密度更高,也可能导致问题。这是因为在调用 time.sleep()
函数延时时间到达后,当前线程不一定能立即拿到时间片,当机器 CPU 资源紧张时,相差可能更多。
方法二:根据预期运行时间计算延时时间
要解决这个问题,我们可以在循环外保留预期运行时间。每次 run()
函数运行完成后,计算下一次预期运行时间,并计算当前时间距离下一次预期运行时间的间隔,作为延时时间。
def run(distance: float):
now_time = time.time() # 当前运行时间
while True:
run()
now_time += distance # 计算下一次预期运行时间
end_time = time.time()
if end_time < now_time:
time.sleep(now_time - end_time)
if __name__ == "__main__":
main(1.0)
在 run()
函数中打印进入函数的时间,结果如下:
1722653572.8364294
1722653573.8374188
1722653574.8369198
1722653575.8366668
1722653576.836901
1722653577.8365157
1722653578.8369763
1722653579.836831
1722653580.8369765
1722653581.8368018
1722653582.8365135
可以看到,虽然每一次的间隔时间略有波动,但可以保证每次间隔时间的期望为严格的间隔时间。
方法三:根据预期运行时间计算延时时间,并在超出预期时重置预期时间
虽然方法二可以保证每次间隔时间的预期为严格的间隔时间,但是如果 run()
函数运行时间不稳定,在高峰期是超过间隔时间,在低谷期时低于间隔时间,则会导致在进入低谷期后,因为实际时间已经晚于预期运行时间,从而使得每次 run()
函数运行结束后立即再次运行。例如,当 run()
函数逻辑如下时,打印的 run()
函数进入时间如下:
cnt = 0
def run():
global cnt
print(time.time())
if cnt < 5:
time.sleep(1.5)
cnt += 1
1722654393.7552695
1722654395.256099
1722654396.7561674
1722654398.2568493
1722654399.75726
1722654401.2577772
1722654401.2577772
1722654401.2577772
1722654401.7556682
1722654402.7556927
1722654403.7554414
可以看到,在 5 次 1.5 秒的 run()
运行后,后面在同一时刻连续调用了 3 次。
如果这是不满足预期的,那么可以在当运行时间超出时,重置预期运行时间。将 main()
方法修改如下:
def main(distance: float):
now_time = time.time() # 当前运行时间
while True:
run()
now_time += distance # 计算下一次预期运行时间
end_time = time.time()
if end_time < now_time:
time.sleep(now_time - end_time)
else:
now_time = end_time
再次运行后效果如下:
1722654671.8938496
1722654673.3943036
1722654674.8946393
1722654676.3951948
1722654677.8957179
1722654679.3958683
1722654680.3961017
1722654681.3961802
1722654682.3959875
1722654683.3959806
1722654684.3960238
工具函数
将上述三种模式,整理工具函数如下:
import time
from typing import Callable, NoReturn
__all__ = [
"timer_run_mode_1",
"timer_run_mode_2",
"timer_run_mode_3",
]
def timer_run_mode_1(func: Callable, distance: float) -> NoReturn:
"""每过 distance 调用一次 func 方法
模式 1:
- 当 func 运行时间小于 distance 时,每次间隔时间的期望会略大于 distance
- 当 func 运行时间大于 distance 时,不会延时
Parameters
----------
func : Callable
被调用的方法
distance : float
间隔时间
"""
while True:
start_time = time.time()
func()
end_time = time.time()
if end_time - start_time < distance:
time.sleep(distance - (end_time - start_time))
def timer_run_mode_2(func: Callable, distance: float) -> NoReturn:
"""每过 distance 调用一次 func 方法
模式 2:
- 每次间隔时间的期望严格等于 distance
- 当 func 运行时间小于 distance 时,每次间隔时间的期望严格等于 distance
- 当 func 运行时间大于 distance 时,不会延时,但会累积需要实际运行时间与期望运行时间的差异,在后续 func 运行时间小于 distance 时,不再延时直至追回期望时间
Parameters
----------
func : Callable
被调用的方法
distance : float
间隔时间
"""
now_time = time.time() # 当前运行时间
while True:
func()
now_time += distance # 根据当前运行时间,计算下一次预期运行时间
end_time = time.time()
if end_time < now_time:
time.sleep(now_time - end_time)
def timer_run_mode_3(func: Callable, distance: float) -> NoReturn:
"""每过 distance 调用一次 func 方法
模式 3:
- 当 func 运行时间小于 distance 时,每次间隔时间的期望严格等于 distance
- 当 func 运行时间大于 distance 时,不会延时,也不会积累实际运行时间与期望运行时间的差异;在后续 func 运行时间小于 distance 时,恢复每次间隔时间的期望严格等于 distance
Parameters
----------
func : Callable
被调用的方法
distance : float
间隔时间
"""
now_time = time.time() # 当前运行时间
while True:
func()
now_time += distance # 计算下一次预期运行时间
end_time = time.time()
if end_time < now_time:
time.sleep(now_time - end_time)
else:
now_time = end_time # 重置期望时间