Celery与FastAPI实现分布式异步任务队列 |
您所在的位置:网站首页 › python消息队列celery › Celery与FastAPI实现分布式异步任务队列 |
Celery 分布式任务队列框架,主要包括三个组成成分: [1] - Celery 客户端(Client) [2] - 消息中间件(Message Broker) [3] - Celery Worker 他们之间的关系如图: 这里,采用 FastAPI 作为 Celery Client,RabbitMQ 作为 Message Broker. 其中, [1] - Celery Client 运行 FastAPI app,并传递消息或后台任务(message/background jobs) 到 RabbitMQ; [2] - RabbitMQ 作为 Message Broker,将会调度 clients 和 workers 之间的消息; [3] - RabbitMQ 在接收到 client 发送的消息后,通过将消息发送到一个 Celery Worker 以初始化 client 任务; [4] - 一个 Celery Worker 被看做为后台任务,其可以从任何网络服务请求,实现异步性; [5] - 同时可以有很多 workers 进行或完成很多任务(每个任务作为一个独立线程thread); [6] - Celery 确保每个 worker 在同一时间点只执行一个任务,且每个任务只能被分配到一个 worker. 基于以上,实现如下: Github - azzan-amin-97/FastAPI_Async_Celery 1. 依赖项安装pip install fastapi pip install celery pip install uvicorn #ASGI server to run FastAPI app. pip install flower #任务队列监控设置 Message Broker(基于 Docker): docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:32. 创建 Celery Worker Taskcelery_worker.py #!/usr/bin/python3 #!--*-- coding: utf-8 --*-- import time from celery import Celery from celery.utils.log import get_task_logger #实例化 Celery celery = Celery('tasks', broker='amqp://guest:[email protected]:5672//') # 创建 logger,以显示日志信息 celery_log = get_task_logger(__name__) # 创建任务函数,以订单(Order) 为例,异步进行 @celery.task def create_order(name, quantity): # 5 seconds per 1 order complete_time_per_item = 5 # Keep increasing depending on item quantity being ordered time.sleep(complete_time_per_item * quantity) # 显示日志 celery_log.info(f"Order Complete!") return {"message": f"Hi {name}, Your order has completed!", "order_quantity": quantity}3. 创建 Model 和 Appmodel.py: #!/usr/bin/python3 #!--*-- coding: utf-8 --*-- from pydantic import BaseModel # Pydantic BaseModel # Order class model for request body class Order(BaseModel): customer_name: str order_quantity: intmain.py #!/usr/bin/python3 #!--*-- coding: utf-8 --*-- from fastapi import FastAPI from celery_worker import create_order from model import Order # Create FastAPI app app = FastAPI() # Create order endpoint @app.post('/order') def add_order(order: Order): # use delay() method to call the celery task create_order.delay(order.customer_name, order.order_quantity) return {"message": "Order Received! Thank you for your patience."}4. 运行 FastAPI 和 Celery Worker serveruvicorn main:app --reload访问:http://localhost:8000/docs 即可查看 FastAPI Swagger Docs. 启动 Celery Worker: celery -A celery_worker.celery worker --loglevel=info启动 flower 服务监控 Celery 消息队列: celery flower -A celery_worker.celery --broker:amqp://localhost//访问 http://localhost:5555/ 5. 测试和分析 Celery访问 http://localhost:8000/docs,示例如, 点击 Execute,返回结果如: |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |