python定时任务 每天运行一次 |
您所在的位置:网站首页 › python运行一次再次运行 › python定时任务 每天运行一次 |
背景 最近有个需求,需要实现一个定时或定期任务的功能,需要实现每月、每日、每时、一次性等需求,必须是轻量级不依赖其它额外组件,并能支持动态添加任务。由于当前任务信息保存在集群 ETCD 数据库中,因此对任务持久化要求不高,每次重启都直接读取 ETCD 任务信息,为了后面扩展,还需要添加任务持久化功能。 定时任务库对比 根据上面需求,从社区中找到了几个 Python 好用的任务调度库。有以下几个库:schedule:Python job scheduling for humans. 轻量级,无需配置的作业调度库 python-crontab: 针对系统 Cron 操作 crontab 文件的作业调度库 Apscheduler:一个高级的 Python 任务调度库 Celery: 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度 优缺点对比:schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务 Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求 Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源(如 redis、MongoDB),支持接入到各种异步框架(如 gevent、asyncio、tornado) Celery 支持配置定期任务、支持 crontab 模式配置,不支持一次性定时任务 schedule 库 人类的Python 任务调度库,和 requests 库一样 for humans. 这个库也是最轻量级的一个任务调度库,schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。 直接使用 pip install schedule进行安装使用,下面来看看官网给的示例: import schedule import time # 定义你要周期运行的函数 def job(): print("I'm working...") schedule.every(10).minutes.do(job) # 每隔 10 分钟运行一次 job 函数 schedule.every().hour.do(job) # 每隔 1 小时运行一次 job 函数 schedule.every().day.at("10:30").do(job) # 每天在 10:30 时间点运行 job 函数 schedule.every().monday.do(job) # 每周一 运行一次 job 函数 schedule.every().wednesday.at("13:15").do(job) # 每周三 13:15 时间点运行 job 函数 schedule.every().minute.at(":17").do(job) # 每分钟的 17 秒时间点运行 job 函数 while True: schedule.run_pending() # 运行所有可以运行的任务 time.sleep(1) 通过上面示例,可以很容易学会使用 schedule 库,可以设置秒、分钟、小时、天、周来运行任务,然后通过一个死循环,一直不断地运行所有的计划任务。 schedule 常见问题 1、如何并行执行任务? schedule 是阻塞式的,默认情况下, schedule 按顺序执行所有的作业,不能达到并行执行任务。如下所示: import arrow import schedule def job1(): print("job1 start time: %s" % arrow.get().format()) time.sleep(2) print("job1 end time: %s" % arrow.get().format()) def job2(): print("job2 start time: %s" % arrow.get().format()) time.sleep(5) print("job2 end time: %s" % arrow.get().format()) def job3(): print("job3 start time: %s" % arrow.get().format()) time.sleep(10) print("job3 end time: %s" % arrow.get().format()) if __name__ == '__main__': schedule.every(10).seconds.do(job1) schedule.every(30).seconds.do(job2) schedule.every(5).to(10).seconds.do(job3) while True: schedule.run_pending() 返回部分结果如下所示,几个任务并不是并行开始的,是安装时间顺序先后开始的: job3 start time: 2019-06-01 09:27:54+00:00 job3 end time: 2019-06-01 09:28:04+00:00 job1 start time: 2019-06-01 09:28:04+00:00 job1 end time: 2019-06-01 09:28:06+00:00 job3 start time: 2019-06-01 09:28:13+00:00 job3 end time: 2019-06-01 09:28:23+00:00 job2 start time: 2019-06-01 09:28:23+00:00 job2 end time: 2019-06-01 09:28:28+00:00 job1 start time: 2019-06-01 09:28:28+00:00 job1 end time: 2019-06-01 09:28:30+00:00 job3 start time: 2019-06-01 09:28:30+00:00 job3 end time: 2019-06-01 09:28:40+00:00 job1 start time: 2019-06-01 09:28:40+00:00 job1 end time: 2019-06-01 09:28:42+00:00 如果需要实现并行,那么使用多线程方式运行任务,官方给出的并行方案如下: import threading import time import schedule def job(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) while 1: |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |