Python-库开发-定时任务框架
2019-03-04
本博客所有文章采用的授权方式为 自由转载-非商用-非衍生-保持署名 ,转载请务必注明出处,谢谢。
声明: 本博客欢迎转发,但请保留原作者信息!
github地址:atanx
新浪微博:@蜀山掌门V
QQ:365039667
博客地址:江斌的博客
内容仅供学习参考,如有不当引用,请告知博主。
#!/usr/bin/env python
# coding=utf-8
import time
import threading
class Scheduler(object):
def __init__(self):
self.tasks = {}
def add_task(self, task_id, interval, callback, replaced=False):
if task_id in self.tasks and replaced is False:
# raise KeyError(u'存在相同id的task')
return False
self.tasks[task_id] = dict(interval=interval, callback=callback, elasped=0)
return True
def run(self):
while True:
time.sleep(1)
for task_id, task in self.tasks.iteritems():
task['elasped'] += 1
elasped = task['elasped']
interval = task['interval']
callback = task['callback']
if elasped % interval == 0:
callback()
def task1():
print "task1"
def task2():
print "task2"
def task3():
print "task3"
if __name__ == '__main__':
s = Scheduler()
s.add_task(task_id=1, interval=10, callback=task1)
s.add_task(task_id=2, interval=3, callback=task2)
loop_thread = threading.Thread(target=s.run) # 循环线程
# loop_thread.join()
s.add_task(task_id=3, interval=4, callback=task3)