Python-库开发-定时任务框架

2019-03-04

本博客所有文章采用的授权方式为 自由转载-非商用-非衍生-保持署名 ,转载请务必注明出处,谢谢。

声明: 本博客欢迎转发,但请保留原作者信息!
github地址:atanx
新浪微博:@蜀山掌门V
QQ:365039667
博客地址:江斌的博客
内容仅供学习参考,如有不当引用,请告知博主。
#!/usr/bin/env python
# coding=utf-8

import time
import threading


class Scheduler(object):
	def __init__(self):
		self.tasks = {}

	def add_task(self, task_id, interval, callback, replaced=False):
		if task_id in self.tasks and replaced is False:
			# raise KeyError(u'存在相同id的task')
			return False
		self.tasks[task_id] = dict(interval=interval, callback=callback, elasped=0)
		return True

	def run(self):
		while True:
			time.sleep(1)
			for task_id, task in self.tasks.iteritems():
				task['elasped'] += 1
				elasped = task['elasped']
				interval = task['interval']
				callback = task['callback']
				if elasped % interval == 0:
					callback()


def task1():
	print "task1"


def task2():
	print "task2"


def task3():
	print "task3"


if __name__ == '__main__':
	s = Scheduler()
	s.add_task(task_id=1, interval=10, callback=task1)
	s.add_task(task_id=2, interval=3, callback=task2)
	loop_thread = threading.Thread(target=s.run)  # 循环线程
	# loop_thread.join()
	s.add_task(task_id=3, interval=4, callback=task3)


章节列表