分布式任务调度celery与进程管理supervisord

您所在的位置:网站首页 celery任务调度 分布式任务调度celery与进程管理supervisord

分布式任务调度celery与进程管理supervisord

2024-07-15 09:53| 来源: 网络整理| 查看: 265

一、使用celery的原因

分布式任务调度框架celery及其监控工具flower,Linux进程管理工具supervisor

项目痛点: 1、代码上线及运维困难,新代码上线必须保证系统中没有正在运行的异步任务,等待任务结束期间无法保证系统不在接收新任务。(项目中进程多是以multiprocessing方式启动) 2、重启困难,重启后不知道是否启动成功,必须手动curl测试接口保证系统重启成功,缺少重启监控机制。

痛点解决: 1、celery解决中断任务痛点,所有异步任务均由celery下发。可单独重启一个worker或所有worker。重启worker时保证当前worker正在消费的任务重新回到队列,等待处于工作状态的worker消费。不同worker可运行不同版本的代码。 2、supervisor解决重启痛点,新架构中一个节点会启多worker以及flower和后端服务,具有大量进程需要管理,手动管理已然不现实。supervisor可对启动异常的进程自动重启也可对异常退出的进程进行拉起,并且提供客户端和web界面。

二、架构图 celery 架构.png 三、调度框架celery

celery中的几个概念

1、broker 消息传输中间件,可以简单理解为队列,支持RabbitMQ,Redis,SQS(某些博客说支持sqlalchemy,官网未找到,实验也未成功)。celery对Redis Cluster类型的redis集群支持不是很好,目前正在寻找解决方案。 2、exchange 路由,可将特定任务路由到指定队列。 3、worker 消费者。会在多节点启多worker 4、task 异步任务。某些任务需要指定消费节点。所以触发任务时需要显式指定该任务的存放的队列,task.apply_async(queue='q1')。未指定的将会放到default队列,由三个节点竞争。 5、backend 结果存储。可使用mq,redis,nosql、mysql等。存放任务执行的结果。

1、使用方案 一、异步任务

设置default,q1,q2,q3四个队列,各节点会监听各自的队列,并且所有节点都监听default队列。 编写异步任务和正常写函数是一样的,最后只需要对该函数使用装饰器@celery.task将该任务注册为异步任务。如果有多个装饰器进行组合使用时,必须确保 task() 装饰器被放置在首位:

@app.task @decorator2 @decorator1 def add(x, y): return x + y

触发任务 简单触发时可使用 delay ,但是该方法无法指定存放的队列,因此该任务会被放到默认队列

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

如果需要设置额外的行参数,必须用 apply_async

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x'}, queue='q1')

启动worker

celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-1@%%h 二、beat任务(定时任务)

对于celery产生的定时任务如果放到一个队列里,该任务被一个worker拿到后其他worker将获取不到该任务。这样会产生一个现象即该任务只在一个节点执行了,但业务上需要的是该任务在各个节点都执行。 对此现象的解决方案是各节点需要定制各节点的定时任务并放到各自的队列里。对于任务a生成者会将其产生三份对下发到三个节点。对于任务b,node1并不需要执行,因此会产生两份并将其下发到node2和node3上。 这并不意味同一个任务需要编写三份,任务编写完后只需要将其注册各节点对应的配置里(需要自己实现)。 为不影响其他异步任务执行,beat将会由各节点单独的worker进行消费。

产生beat任务

celery -A app.celery beat -l info

消费beat任务

celery -A app.celery worker -l info -Q node1-crontab --concurrency=10 -n node1-worker-crontab@%%h

这里的node1-crontab为新的队列,专门存放node1节点需要消费的定时任务。

三、task是如何工作的

这里会说明为什么不同worker可以运行不同版本的代码,甚至生产者和消费者之间也可以运行不同版本的代码。

celery的任务是注册在注册表中,该表中注册了任务名和任务类。说人话就是celery会在队列中传递任务的模块,例如proj模块中有一个task.py,该文件中编写了一个叫add的异步任务(函数),那么celery传递的就是proj.task.add,只要保证消费该任务的worker中有该模块该文件该函数就行,worker并不关心该函数里是怎样执行的,是否和生产者一致。

任务状态

PENDING 任务正在等待执行或未知。任何未知的任务 ID 都默认处于挂起状态。 STARTED 任务开始执行 SUCCESS 任务执行成功 FAILURE 任务执行失败 RETRY 任务处于重试状态,这里指在task中捕获到异常并显式调用celery使其重试 REVOKED 任务被撤销 @celery.task(bind=True) def send_twitter_status(self, oauth, tweet): try: twitter = Twitter(oauth) twitter.update_status(tweet) except (Twitter.FailWhaleError, Twitter.LoginError) as exc: raise self.retry(exc=exc) 2、celery监控工具flower

Flower是基于web的监控和管理celery工具

flower可以

用Celery事件实时监控,显示任务的详细信息,图形化和统计 查看worker状态和统计,查看当前正在运行的tasks Broker monitoring(中间人监控),查看所有Celery 队列的统计,队列长度图

flower只需启动在生产者端即可

截图展示

image.png 四、进程管理工具supervisor

粗略估计在node1上会启后端服务,celery worker三个,定时任务消费worker一个,celery beat一个,flower进程。这么多进程用手工一个个启动肯定要花费大量时间,于是用supervisor管理这些进程。 supervisor会已启动自己子进程的方式开启进程,可以对异常退出的进程进行重启操作。 supervisor可以分为三个部分

supervisord 服务端,主要负责启动与管理进程,响应客户端的请求 supervisorctl 客户端,提供一个命令行来使用supervisord提供的服务 web界面 用来查看与管理子进程 1、子进程配置 [program:worker] command=celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-%(process_num)s@%%h ; 启动命令 process_name=%(program_name)s-%(process_num)d ; 进程名 numprocs=3 ; 进程数量 directory=/Users/aaa/PycharmProjects/flask_test ; 工作路径 ;umask=022 ; umask for process (default None) priority=999 ; 优先级。优先级低,最先启动,关闭的时候最后关闭 autostart=true ; supervisor启动后自动启动 startsecs=1 ; 启动多少秒后是running认为启动成功 ;startretries=3 ; 最大启动重试次数 (default 3) autorestart=true ; 子进程挂掉自动重启 (def: unexpected) ;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0) stopsignal=TERM ; 进程停止信号,停止celery worker时使用TERM, (TERM, HUP, INT, QUIT, KILL, USR1, or USR2) stopwaitsecs=30 ; 等待停止最大时间,超过此时间会强制kill (default 10) stopasgroup=true ; 停掉子进程的子进程(保证不会出现孤儿进程) ;killasgroup=true ; kill进程及其子进程,直接发送KILL信号不会等待进程退出 ;user=chrism ; 管理子进程的用户 redirect_stderr=true ; redirect 日志 stderr to stdout stdout_logfile=/Users/aaa/PycharmProjects/flask_test/log/node1/celery-worker-1.log ; 日志 stdout_logfile_maxbytes=50MB ; 单个日志文件最大大小 (default 50MB) stdout_logfile_backups=20 ; 日志文件数量 (default 10) ;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0) ;stdout_events_enabled=false ; emit events on stdout writes (default false) ;stdout_syslog=false ; send stdout to syslog with process name (default false) ;stderr_logfile=/Users/aaa/PycharmProjects/flask_test/log/default/celery_err.log ; 错误日志 ;stderr_logfile_maxbytes=10MB ; max # logfile bytes b4 rotation (default 50MB) ;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10) ;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0) ;stderr_events_enabled=false ; emit events on stderpidr writes (default false) ;stderr_syslog=false ; send stderr to syslog with process name (default false) environment=PATH="/Users/aaa/anaconda3/envs/flask_test/bin" ; 环境变量,子进程间不共享 ;serverurl=AUTO ; override serverurl computation (childutils)

supervisor配置文件放在supervisord.conf中

启动supervisord

supervisord -c supervisord.conf

对于celery worker节点进程退出信号使用TERM,TERM信号会使worker进行热关机,worker会将未消费完的任务放回到队列。

发现丢任务的情况: 假设worker1正在消费任务3个,worker2正在消费任务4个。将worker1关机,3个任务会进入到worker2,再将work2关机后打开worker3,这时会发现少了两个任务。

丢任务的解决方案: 方案一:启动一个worker然后将其关机后未消费完的任务可以全部回到队列,需要重启时可以先将未有消费任务的worker进行重启,然后再停掉正在消费的worker。或者只停掉一个worker。 方案二:supervisor进程组的概念,直接将进程组重启

代码更新后只需重启子进程,不需要重启supervisord

进程组配置

[group:celery-worker] programs=worker ;上面实例三个进程会默认分配到名为worker的进程组,这里定义进程组会覆盖默认的 priority=999 ; the relative start priority (default 999)

对进程组进行操作等同于对进程组下所有的进程操作

对于进程组操作在进程组名后需要加上冒号即 celery-worker:

2、进程数说明

问题:celery启动命令中已经指定了 --concurrency=10 参数配置worker中开启的进程数量,为什么在supervisord中还要指定 numprocs=3 进程数呢?

答:这两个参数指定的进程数量是不同的意义。在celery中指定进程数即意味着单个worker中可开启的最大进程数据量。在supervisord指定的进程数会直接开启三个worker,相当将定义的cmd执行了三次。

supervisord中如果指定numprocs的同时也需要指定 process_name=%(program_name)s-%(process_num)d ,原因在于如果多个进程使用相同的进程名会报错,所以需要指定不同的进程名。program_name为 [program:worker] 中定义的名字,即worker。process_num为进程的序号,从1开始,注意它不是pid。

使用numprocs=3 创建的三个worker默认会被放到一个名为worker(在哪里定义)的进程组里,如果在后面定义一个新的进程组并将worker放进去则这三个worker会的默认进程组会被替换为新的进程组,同时新的进程组里也可以放一个在其它program里定义的进程。

[group:node1-celery-worker] programs=worker,crontab-worker # crontab-worker为在其它program里定义的进程 priority=999 3、supervisorctl命令 supervisorctl start ${program} # 启动进程 supervisorctl stop ${program} # 停止进程 supervisorctl restart ${program} # 重启进程 supervisorctl status ${program} # 查看进程状态 supervisorctl update # 重新载入配置文件 supervisorctl shutdown # 关闭supervisord服务 supervisorctl reload # 重启supervisord服务 supervisorctl stop all # 停止所有进程

对于已经配置好的supervisor并不需要进行supervisord级别的重启以及重新载入配置。代码更新后只需重启子进程即可加载最新代码。

image.png 4、supervisor界面

将supervisorctl的命令可视化,可以直接点点点,此外还可以展示子进程的日志。实则感觉有supervisorctl就可以了

image.png


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3