python定时任务 每天运行一次

您所在的位置:网站首页 python运行一次再次运行 python定时任务 每天运行一次

python定时任务 每天运行一次

2023-09-22 05:12| 来源: 网络整理| 查看: 265

背景

最近有个需求,需要实现一个定时或定期任务的功能,需要实现每月、每日、每时、一次性等需求,必须是轻量级不依赖其它额外组件,并能支持动态添加任务。由于当前任务信息保存在集群 ETCD 数据库中,因此对任务持久化要求不高,每次重启都直接读取 ETCD 任务信息,为了后面扩展,还需要添加任务持久化功能。

定时任务库对比

根据上面需求,从社区中找到了几个 Python 好用的任务调度库。有以下几个库:schedule:Python job scheduling for humans. 轻量级,无需配置的作业调度库

python-crontab: 针对系统 Cron 操作 crontab 文件的作业调度库

Apscheduler:一个高级的 Python 任务调度库

Celery: 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度

优缺点对比:schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务

Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求

Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源(如 redis、MongoDB),支持接入到各种异步框架(如 gevent、asyncio、tornado)

Celery 支持配置定期任务、支持 crontab 模式配置,不支持一次性定时任务

schedule 库

人类的Python 任务调度库,和 requests 库一样 for humans. 这个库也是最轻量级的一个任务调度库,schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。

直接使用 pip install schedule进行安装使用,下面来看看官网给的示例:

import schedule

import time

# 定义你要周期运行的函数

def job():

print("I'm working...")

schedule.every(10).minutes.do(job) # 每隔 10 分钟运行一次 job 函数

schedule.every().hour.do(job) # 每隔 1 小时运行一次 job 函数

schedule.every().day.at("10:30").do(job) # 每天在 10:30 时间点运行 job 函数

schedule.every().monday.do(job) # 每周一 运行一次 job 函数

schedule.every().wednesday.at("13:15").do(job) # 每周三 13:15 时间点运行 job 函数

schedule.every().minute.at(":17").do(job) # 每分钟的 17 秒时间点运行 job 函数

while True:

schedule.run_pending() # 运行所有可以运行的任务

time.sleep(1)

通过上面示例,可以很容易学会使用 schedule 库,可以设置秒、分钟、小时、天、周来运行任务,然后通过一个死循环,一直不断地运行所有的计划任务。

schedule 常见问题

1、如何并行执行任务?

schedule 是阻塞式的,默认情况下, schedule 按顺序执行所有的作业,不能达到并行执行任务。如下所示:

import arrow

import schedule

def job1():

print("job1 start time: %s" % arrow.get().format())

time.sleep(2)

print("job1 end time: %s" % arrow.get().format())

def job2():

print("job2 start time: %s" % arrow.get().format())

time.sleep(5)

print("job2 end time: %s" % arrow.get().format())

def job3():

print("job3 start time: %s" % arrow.get().format())

time.sleep(10)

print("job3 end time: %s" % arrow.get().format())

if __name__ == '__main__':

schedule.every(10).seconds.do(job1)

schedule.every(30).seconds.do(job2)

schedule.every(5).to(10).seconds.do(job3)

while True:

schedule.run_pending()

返回部分结果如下所示,几个任务并不是并行开始的,是安装时间顺序先后开始的:

job3 start time: 2019-06-01 09:27:54+00:00

job3 end time: 2019-06-01 09:28:04+00:00

job1 start time: 2019-06-01 09:28:04+00:00

job1 end time: 2019-06-01 09:28:06+00:00

job3 start time: 2019-06-01 09:28:13+00:00

job3 end time: 2019-06-01 09:28:23+00:00

job2 start time: 2019-06-01 09:28:23+00:00

job2 end time: 2019-06-01 09:28:28+00:00

job1 start time: 2019-06-01 09:28:28+00:00

job1 end time: 2019-06-01 09:28:30+00:00

job3 start time: 2019-06-01 09:28:30+00:00

job3 end time: 2019-06-01 09:28:40+00:00

job1 start time: 2019-06-01 09:28:40+00:00

job1 end time: 2019-06-01 09:28:42+00:00

如果需要实现并行,那么使用多线程方式运行任务,官方给出的并行方案如下:

import threading

import time

import schedule

def job():

print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):

job_thread = threading.Thread(target=job_func)

job_thread.start()

schedule.every(10).seconds.do(run_threaded, job)

schedule.every(10).seconds.do(run_threaded, job)

schedule.every(10).seconds.do(run_threaded, job)

schedule.every(10).seconds.do(run_threaded, job)

schedule.every(10).seconds.do(run_threaded, job)

while 1:



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3