Celery与FastAPI实现分布式异步任务队列

您所在的位置:网站首页 python消息队列celery Celery与FastAPI实现分布式异步任务队列

Celery与FastAPI实现分布式异步任务队列

2023-04-15 06:43| 来源: 网络整理| 查看: 265

Celery 分布式任务队列框架,主要包括三个组成成分:

[1] - Celery 客户端(Client)

[2] - 消息中间件(Message Broker)

[3] - Celery Worker

他们之间的关系如图:

这里,采用 FastAPI 作为 Celery Client,RabbitMQ 作为 Message Broker.

其中,

[1] - Celery Client 运行 FastAPI app,并传递消息或后台任务(message/background jobs) 到 RabbitMQ;

[2] - RabbitMQ 作为 Message Broker,将会调度 clients 和 workers 之间的消息;

[3] - RabbitMQ 在接收到 client 发送的消息后,通过将消息发送到一个 Celery Worker 以初始化 client 任务;

[4] - 一个 Celery Worker 被看做为后台任务,其可以从任何网络服务请求,实现异步性;

[5] - 同时可以有很多 workers 进行或完成很多任务(每个任务作为一个独立线程thread);

[6] - Celery 确保每个 worker 在同一时间点只执行一个任务,且每个任务只能被分配到一个 worker.

基于以上,实现如下:

Github - azzan-amin-97/FastAPI_Async_Celery

1. 依赖项安装pip install fastapi pip install celery pip install uvicorn #ASGI server to run FastAPI app. pip install flower #任务队列监控

设置 Message Broker(基于 Docker):

docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:32. 创建 Celery Worker Task

celery_worker.py

#!/usr/bin/python3 #!--*-- coding: utf-8 --*-- import time from celery import Celery from celery.utils.log import get_task_logger #实例化 Celery celery = Celery('tasks', broker='amqp://guest:[email protected]:5672//') # 创建 logger,以显示日志信息 celery_log = get_task_logger(__name__) # 创建任务函数,以订单(Order) 为例,异步进行 @celery.task def create_order(name, quantity): # 5 seconds per 1 order complete_time_per_item = 5 # Keep increasing depending on item quantity being ordered time.sleep(complete_time_per_item * quantity) # 显示日志 celery_log.info(f"Order Complete!") return {"message": f"Hi {name}, Your order has completed!", "order_quantity": quantity}3. 创建 Model 和 App

model.py:

#!/usr/bin/python3 #!--*-- coding: utf-8 --*-- from pydantic import BaseModel # Pydantic BaseModel # Order class model for request body class Order(BaseModel): customer_name: str order_quantity: int

main.py

#!/usr/bin/python3 #!--*-- coding: utf-8 --*-- from fastapi import FastAPI from celery_worker import create_order from model import Order # Create FastAPI app app = FastAPI() # Create order endpoint @app.post('/order') def add_order(order: Order): # use delay() method to call the celery task create_order.delay(order.customer_name, order.order_quantity) return {"message": "Order Received! Thank you for your patience."}4. 运行 FastAPI 和 Celery Worker serveruvicorn main:app --reload

访问:http://localhost:8000/docs 即可查看 FastAPI Swagger Docs.

启动 Celery Worker:

celery -A celery_worker.celery worker --loglevel=info

启动 flower 服务监控 Celery 消息队列:

celery flower -A celery_worker.celery --broker:amqp://localhost//

访问 http://localhost:5555/

5. 测试和分析 Celery

访问 http://localhost:8000/docs,示例如,

点击 Execute,返回结果如:



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3