前方高能!硬核源码剖析 Celery Beat 调度原理

您所在的位置:网站首页 ceiery什么意思中文 前方高能!硬核源码剖析 Celery Beat 调度原理

前方高能!硬核源码剖析 Celery Beat 调度原理

2024-01-21 16:43| 来源: 网络整理| 查看: 265

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。

为了讲解 Celery Beat 的周期调度机制及实现原理,我们会基于Django从制作一个简单的周期任务开始,然后一步一步拆解 Celery Beat 的源代码。

相关前置应用知识,可以阅读以下文章:

1. 实战教程!Django Celery 异步与定时任务2. Python Celery异步快速下载股票数据

1.Celery 简单周期任务示例

在 celery_app.tasks.py 中添加如下任务:

@shared_task def pythondict_task():     print("pythondict_task")

在 django.celery.py 文件中添加如下配置:

from celery_django import settings from datetime import timedelta app.autodiscover_tasks(lambda : settings.INSTALLED_APPS) CELERYBEAT_SCHEDULE = {     'pythondict_task': {         'task': 'celery_app.tasks.pythondict_task',         'schedule': timedelta(seconds=3),     }, } app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

至此,配置完成,此时,先启动 Celery Beat 定时任务命令:

celery beat -A celery_django -S django

然后打开第二个终端进程启动消费者:

celery -A celery_django worker

此时在worker的终端上就会输出类似如下的信息:

[2021-07-11 16:34:11,546: WARNING/PoolWorker-3] pythondict_task [2021-07-11 16:34:11,550: WARNING/PoolWorker-4] pythondict_task [2021-07-11 16:34:11,551: WARNING/PoolWorker-2] pythondict_task [2021-07-11 16:34:11,560: WARNING/PoolWorker-1] pythondict_task

看到结果正常输出,说明任务成功定时执行。

2.源码剖析

为了明白 Celery Beat 是如何实现周期任务调度的,我们需要从 Celery 源码入手。

当你执行 Celery Beat 启动命令的时候,到底发生了什么?

celery beat -A celery_django -S django

当你执行这个命令的时候,Celery/bin/celery.py 中的 CeleryCommand 类接收到命令后,会选择 beat 对应的类执行如下代码:

# Python 实用宝典 # https://pythondict.com from celery.bin.beat import beat class CeleryCommand(Command):     commands = {         # ...         'beat': beat,         # ...     }     # ...     def execute(self, command, argv=None):         try:             cls = self.commands[command]         except KeyError:             cls, argv = self.commands['help'], ['help']         cls = self.commands.get(command) or self.commands['help']         try:             return cls(                 app=self.app, on_error=self.on_error,                 no_color=self.no_color, quiet=self.quiet,                 on_usage_error=partial(self.on_usage_error, command=command),             ).run_from_argv(self.prog_name, argv[1:], command=argv[0])         except self.UsageError as exc:             self.on_usage_error(exc)             return exc.status         except self.Error as exc:             self.on_error(exc)             return exc.status

此时cls对应的是beat类,通过查看位于bin/beat.py中的 beat 类可知,该类只重写了run方法和add_arguments方法。

所以此时执行的 run_from_argv 方法是 beat 继承的 Command 的 run_from_argv 方法:

# Python 实用宝典 # https://pythondict.com def run_from_argv(self, prog_name, argv=None, command=None):     return self.handle_argv(prog_name, sys.argv if argv is None else argv, command)

该方法中会调用 Command 的 handle_argv 方法,而该方法在经过相关参数处理后会调用 self(*args, **options) 到 __call__ 函数:

     # Python 实用宝典     # https://pythondict.com          def handle_argv(self, prog_name, argv, command=None):         """Parse command-line arguments from ``argv`` and dispatch         to :meth:`run`.         :param prog_name: The program name (``argv[0]``).         :param argv: Command arguments.         Exits with an error message if :attr:`supports_args` is disabled         and ``argv`` contains positional arguments.         """         options, args = self.prepare_args(             *self.parse_options(prog_name, argv, command))         return self(*args, **options)

Command 类的 __call__函数:

    # Python 实用宝典     # https://pythondict.com          def __call__(self, *args, **kwargs):         random.seed() # maybe we were forked.         self.verify_args(args)         try:             ret = self.run(*args, **kwargs)             return ret if ret is not None else EX_OK         except self.UsageError as exc:             self.on_usage_error(exc)             return exc.status         except self.Error as exc:             self.on_error(exc)             return exc.status

可见,在该函数中会调用到run方法,此时调用的run方法就是beat类中重写的run方法,查看该方法:

# Python 实用宝典 # https://pythondict.com      class beat(Command):     """Start the beat periodic task scheduler.     Examples::         celery beat -l info         celery beat -s /var/run/celery/beat-schedule --detach         celery beat -S djcelery.schedulers.DatabaseScheduler     """     doc = __doc__     enable_config_from_cmdline = True     supports_args = False     def run(self, detach=False, logfile=None, pidfile=None, uid=None,             gid=None, umask=None, working_directory=None, **kwargs):         # 是否开启后台运行         if not detach:             maybe_drop_privileges(uid=uid, gid=gid)         workdir = working_directory         kwargs.pop('app', None)         # 设定偏函数         beat = partial(self.app.Beat,                        logfile=logfile, pidfile=pidfile, **kwargs)         if detach:             with detached(logfile, pidfile, uid, gid, umask, workdir):                 return beat().run() # 后台运行         else:             return beat().run() # 立即运行

这里引用了偏函数的知识,偏函数就是从基函数创建一个新的带默认参数的函数,详细可见廖雪峰老师的介绍:https://www.liaoxuefeng.com/wiki/1016959663602400/1017454145929440

可见,此时创建了app的Beat方法的偏函数,并通过 .run 函数执行启动 beat 进程,首先看看这个 beat 方法:

    # Python 实用宝典     # https://pythondict.com     @cached_property     def Beat(self, **kwargs):         # 导入celery.apps.beat:Beat类         return self.subclass_with_self('celery.apps.beat:Beat')

可以看到此时就实例化了 celery.apps.beat 中的 Beat 类,并调用了该实例的 run 方法:

    # Python 实用宝典     # https://pythondict.com     def run(self):         print(str(self.colored.cyan(             'celery beat v{0} is starting.'.format(VERSION_BANNER))))         # 初始化loader         self.init_loader()         # 设置进程         self.set_process_title()         # 开启任务调度         self.start_scheduler()

init_loader 中,会导入默认的modules,此时会引入相关的定时任务,这些不是本文重点。我们重点看 start_scheduler 是如何开启任务调度的:

    # Python 实用宝典     # https://pythondict.com     def start_scheduler(self):         c = self.colored         if self.pidfile: # 是否设定了pid文件             platforms.create_pidlock(self.pidfile) # 创建pid文件         # 初始化service         beat = self.Service(app=self.app,                             max_interval=self.max_interval,                             scheduler_cls=self.scheduler_cls,                             schedule_filename=self.schedule)                  # 打印启动信息         print(str(c.blue('__ ', c.magenta('-'),                   c.blue(' ... __ '), c.magenta('-'),                   c.blue(' _\n'),                   c.reset(self.startup_info(beat)))))         # 开启日志         self.setup_logging()         if self.socket_timeout:             logger.debug('Setting default socket timeout to %r',                          self.socket_timeout)             # 设置超时             socket.setdefaulttimeout(self.socket_timeout)         try:             # 注册handler             self.install_sync_handler(beat)             # 开启beat             beat.start()         except Exception as exc:             logger.critical('beat raised exception %s: %r',                             exc.__class__, exc,                             exc_info=True)

我们看下beat是如何开启的:

    # Python 实用宝典     # https://pythondict.com     def start(self, embedded_process=False, drift=-0.010):         info('beat: Starting...')         # 打印最大间隔时间         debug('beat: Ticking with max interval->%s',               humanize_seconds(self.scheduler.max_interval))                  # 通知注册该signal的函数         signals.beat_init.send(sender=self)         if embedded_process:             signals.beat_embedded_init.send(sender=self)             platforms.set_process_title('celery beat')         try:             while not self._is_shutdown.is_set():                 # 调用scheduler.tick()函数检查还剩多余时间                 interval = self.scheduler.tick()                 interval = interval + drift if interval else interval                 # 如果大于0                 if interval and interval > 0:                     debug('beat: Waking up %s.',                           humanize_seconds(interval, prefix='in '))                     # 休眠                     time.sleep(interval)                     if self.scheduler.should_sync():                         self.scheduler._do_sync()         except (KeyboardInterrupt, SystemExit):             self._is_shutdown.set()         finally:             self.sync()

这里重点看 self.scheduler.tick() 方法:

    # Python 实用宝典     # https://pythondict.com     def tick(self):         """Run a tick, that is one iteration of the scheduler.         Executes all due tasks.         """         remaining_times = []         try:             # 遍历每个周期任务设定             for entry in values(self.schedule):                 # 下次运行时间                 next_time_to_run = self.maybe_due(entry, self.publisher)                 if next_time_to_run:                     remaining_times.append(next_time_to_run)         except RuntimeError:             pass         return min(remaining_times + [self.max_interval])

这里通过 self.schedule 拿到了所有存放在用 shelve 写入的 celerybeat-schedule 文件的定时任务,遍历所有定时任务,调用 self.maybe_due 方法:

    # Python 实用宝典     # https://pythondict.com     def maybe_due(self, entry, publisher=None):         # 是否到达运行时间         is_due, next_time_to_run = entry.is_due()         if is_due:             # 打印任务发送日志             info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)             try:                 # 执行任务                 result = self.apply_async(entry, publisher=publisher)             except Exception as exc:                 error('Message Error: %s\n%s',                       exc, traceback.format_stack(), exc_info=True)             else:                 debug('%s sent. id->%s', entry.task, result.id)         return next_time_to_run

可以看到,此处会判断任务是否到达定时时间,如果是的话,会调用 apply_async 调用Worker执行任务。如果不是,则返回下次运行时间,让 Beat 进程进行 Sleep,减少进程资源消耗。

到此,我们就讲解完了 Celery Beat 在周期定时任务的检测调度机制,怎么样,小伙伴们有没有什么疑惑?可以在下方留言区留言一起讨论哦。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红字验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

点击下方阅读原文可获得更好的阅读体验

Python实用宝典 (pythondict.com)不只是一个宝典欢迎关注公众号:Python实用宝典



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3