Python 全栈系列48

您所在的位置:网站首页 celery异步结果多久会清理 Python 全栈系列48

Python 全栈系列48

2024-07-14 21:52| 来源: 网络整理| 查看: 265

说明

有一些非常耗时的任务,无法实现实时的RPC调用。因此计划使用celery + flask提供异步任务调度服务。 一个请求的服务过程是这样:

1 服务器接到一个请求(一个几k到几百k的文本)2 服务器计算摘要作为键值,将其加入异步任务。3 服务器将摘要返回,状态为calculating。4 异步任务执行耗时计算,结果有两个副本。一个存在本地(pkl),一个发往目标服务器。这样如果目标服务器没收到,服务就把本地的副本读取再发送,避免重新计算。 1 流程与项目结构

大体上是这样的结构,整个服务存在训练任务和预测任务。预测结果保存在本地和目标数据库。 在这里插入图片描述 celery和flask结合使用的基本结构主要参考这篇文章, 一目了然,很适合初次使用者。 以下这部分引用自上面的文章

from flask import Flask from celery import Celery from celery.result import AsyncResult import time app = Flask(__name__) # 用以储存消息队列 app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/0' # 用以储存处理结果 app.config['CELERY_RESULT_BACKEND'] = 'redis://127.0.0.1:6379/0' celery_ = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery_.conf.update(app.config) @celery_.task def my_background_task(arg1, arg2): # 两数相加 time.sleep(10) return arg1+arg2 @app.route("/sum//") def sum_(arg1, arg2): # 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果 result = my_background_task.delay(int(arg1), int(arg2)) return result.id @app.route("/get_result/") def get_result(result_id): # 根据任务ID获取任务结果 result = AsyncResult(id=result_id) return str(result.get())

启动的时候分别在三个终端输入命令:

# 运行flask服务 gunicorn entry_ner_cpu:app -b 0.0.0.0:5001 # 使用celery执行异步任务 celery -A entry_ner_cpu.celery_ worker # 使用flower监督异步任务 flower --basic_auth=admin:admin --broker=redis://127.0.0.1:6379/0 --address=0.0.0.0 --port=5556

作用分别是启动flask服务(基本web服务),启动celery服务(异步任务),启动flower服务(监控任务)。

1 flower的监控面板 在这里插入图片描述

2 执行任务接口 调用计算任务的接口。(该计算任务计算1+2) 在这里插入图片描述

3 查询任务结果的接口 使用任务ID获取结果。 在这里插入图片描述

2 测试案例

有些文章也介绍了celery的配置和flask是有不同的,单独配置。

2.1 Celery 项目结构

先把celery包装成一个类似函数包的方式,项目结构类似,内容参考这篇文章

在这里插入图片描述 实际我做了一个小的项目包测试,和原文略有不同。(这个算是尝试版,可以用,但可能不是最优雅的方式)

test_project ├── entry_test_celery1.py ├── test_celery1 │ ├── __init__.py │ ├── task1.py │ └── tasks.py

其中entry_test_celery1.py作为入口程序和实际存放异步任务的文件夹test_celery1平级。如果是函数包的话,我习惯把所有的函数收拢到__init__.py下面,在这个case下所有的函数(任务taskx)收拢到tasks.py下面。

1 __init__.py初始化celery实例,指定本地的broker、backend(结果)和任务列表 from celery import Celery app = Celery('test_celery', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0', include=['test_celery1.tasks']) 2 task1.py放了一个简单的求和任务,耗时10秒 from . import app import time @app.task def my_background_task(arg1, arg2): # 两数相加 time.sleep(10) return arg1+arg2 3 tasks.py任务就是把所有的任务收拢起来(当前只有一个) from . task1 import my_background_task

以上三个文件组成的包完成了一个最简单的celery任务定义。

4 使用 entry_test_celery1.py调用celery任务。原文本来使用ready和result两个函数来进行轮询,我使用了AsyncResult来直接获取。 from test_celery1.tasks import my_background_task import time from celery.result import AsyncResult # 先启动test_celery1 # celery -A test_celery1.app worker --loglevel=info if __name__ == '__main__': task = my_background_task.delay(1, 2) task_id = task.id #此时,任务还未完成,它将返回False # print('Task finished? ', result.ready()) # print('Task result: ', result.result) print('Task finished? ', task_id) print('Task result: ', AsyncResult(id=task_id).get()) # 延长到10秒以确保任务已经完成 time.sleep(10) # 现在任务完成,ready方法将返回True print('Task finished? ', task_id) print('Task result: ', AsyncResult(id=task_id).get())

使用时先启动celery,切换到入口函数的当前目录,执行

celery -A test_celery1.app worker --loglevel=info

出现了熟悉的芹菜 在这里插入图片描述 然后执行python

python3 entry_test_celery1.py

在这里插入图片描述

2.2 和Flask结合

将入口函数改为一个简易的flask服务。

from test_celery1.tasks import my_background_task import time from celery.result import AsyncResult from flask import Flask app = Flask(__name__) @app.route('/test_add//', methods=['GET','POST']) def test_add(x,y): # 将请求塞入任务 task = my_background_task.delay(int(x),int(y)) # 返回任务信息 return 'Task Commited' + str(task.id) @app.route('/get_result/', methods=['GET','POST']) def get_result(task_id): print('Accompolished Status', AsyncResult(id=task_id).status) return 'Accompolished Status' + AsyncResult(id=task_id).status if __name__ == '__main__': app.run(debug=True)

调用接口计算 在这里插入图片描述

查看任务状态:第一次看还没有完成,过几秒就完成了 在这里插入图片描述

在这里插入图片描述 注意,启动服务的时候要用gunicorn启动

gunicorn entry_test_celery1_flask:app -b 0.0.0.0:5000

如果直接启动会报如下错误

python3 entry_test_celery1_flask.py

在这里插入图片描述 对应的要重新声明一个celery实例来处理

tem_celery = Celery(backend='redis://127.0.0.1:6379/0') @app.route('/get_result/', methods=['GET','POST']) def get_result(task_id): print('Accompolished Status', tem_celery.AsyncResult(id=task_id).status) return 'Accompolished Status' + tem_celery.AsyncResult(id=task_id).status

启动flower观察也是没有问题的

flower --basic_auth=admin:admin --broker=redis://127.0.0.1:6379/0 --address=0.0.0.0 --port=5556 3 搭建一个docker-compose项目进行应用 3.1 应用流程 1 处理请求提交的一个文件,计算文件摘要。将任务id和文件摘要返给请求者2 将文件处理好格式送给某个耗时函数处理,完成后存在本地。一个是data,一个是meta。meta保存了任务的状态,data则保留了数据,以摘要为文件名。

– 待续 –

3.2 项目打包

将耗时应用打包为任务,启动flask+ celery提供服务。

. ├── Dockerfile ├── app │ ├── __init__.py │ ├── app1 │ │ ├── __init__.py │ │ ├── view1.py │ │ └── view2.py │ ├── main │ │ ├── __init__.py │ │ └── views.py │ ├── static │ │ ├── img │ │ └── vendor │ └── templates │ ├── base.html │ └── main ├── config │ ├── celeryconfig.py │ └── config.py ├── data ├── entry_ner_cpu.py ├── env │ ├── entrypoint.sh │ └── requirements.txt ├── local_celery │ ├── __init__.py │ ├── celeryconfig.py │ ├── task1.py │ ├── task2.py │ ├── task3.py │ └── tasks.py

celery的任务在local_celery中定义,在app的__init__.py进行挂接

... # >>>>>>>>>>>>>>>>>>> 引入local_celery # 引入local_celery的应用 import local_celery.tasks as tt # 引入celery的任务结果位置 from celery import Celery celery_result_q = Celery(backend='redis://127.0.0.1:6379/0') ... 3.3 使用

测试view1.py,发送任务

# 导入蓝图名称 from . import app1 # 导入最基本的必要功能 from flask import render_template, jsonify, request # from app import local_celery_app from app import tt, celery_result_q # 调用任务 @app1.route('/test_sum/', methods=['GET', 'POST']) def test_sum(): res_dict = {} # 输入数据 input_dict = request.get_json() x = input_dict.get('x') y = input_dict.get('y') if not all([x,y]): res_dict['status'] = False res_dict['msg'] = '输入参数为x,y' return jsonify(res_dict) try: # 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果 task = tt.my_backend_task.delay(int(x), int(y)) except: res_dict['status'] = False res_dict['msg'] = '调用任务失败' return jsonify(res_dict) res_dict['status'] = True res_dict['msg'] = 'ok' res_dict['data'] = str(task.id) return jsonify(res_dict)

测试view2.py,获取任务结果

# 导入蓝图名称 from . import app1 # 导入最基本的必要功能 from flask import render_template, jsonify, request # from app import local_celery_app from app import tt, celery_result_q # 调用任务 @app1.route('/test_get_result/', methods=['GET', 'POST']) def test_get_result(): # 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果 res_dict = {} # 输入数据 input_dict = request.get_json() task_id = input_dict.get('task_id') if not all([task_id]): res_dict['status'] = False res_dict['msg'] = '输入参数为task_id' return jsonify(res_dict) try: # print('aaa') cal_status =celery_result_q.AsyncResult(id=task_id).status # print('bbb') # print('ccc') except: res_dict['status'] = False res_dict['msg'] = '获取任务状态失败' return jsonify(res_dict) if cal_status =='SUCCESS': cal_res = celery_result_q.AsyncResult(id=task_id).get() else: cal_res = '' res_dict['status'] = True res_dict['msg'] = '查询成功' res_dict['cal_status'] = cal_status res_dict['cal_res'] = cal_res return jsonify(res_dict)

测试结果:

import requests as req import json url_prefix = 'http://localhost:5000/app1/' # 1 发送任务 data = {'x': '433', 'y': 2} r = req.post(url_prefix + 'test_sum/', json=data).text print(json.loads(r)) task_id = json.loads(r).get('data') # 2 查询结果 # data = {'task_id': '938d5f4c-b559-4e36-874e-8be57d889e33'} data = {'task_id': task_id} r = req.post(url_prefix + 'test_get_result/', json=data).text print(json.loads(r)) --- In [31]: run test14_celery.py {'data': '60388eb4-1eaf-4a43-9ede-409fed152617', 'msg': 'ok', 'status': True} {'cal_res': '', 'cal_status': 'PENDING', 'msg': '查询成功', 'status': True} In [32]: r = req.post(url_prefix + 'test_get_result/', json=data).text ...: ...: print(json.loads(r)) ...: {'cal_res': '', 'cal_status': 'PENDING', 'msg': '查询成功', 'status': True} In [35]: r = req.post(url_prefix + 'test_get_result/', json=data).text ...: ...: print(json.loads(r)) ...: {'cal_res': 435, 'cal_status': 'SUCCESS', 'msg': '查询成功', 'status': True} 3.4 实际任务 3.4.1 原计划(失败)

原本的计划:

1 视图函数1:接受数据,返回摘要和任务号,并将任务送入计算队列2 视图函数2:查询任务状态/结果,如果完成会返回结果。(应用的结果使用task_id返回,明细使用摘要查询)3 任务1(task1):计算任务。根据传入的文本执行计算,返回去重后的结果和明细。4 任务2:存储。暂时合并在任务1,最好是结合回调来做。

碰到的问题:调用神经网络模型(BERT)celery无反应

从日志里能看到,任务接收到了,但是查询状态一直是PENDING。发现没有很好的文档来介绍celery深度的使用,我觉得大概率以后我会自己写一些关键功能。 在这里插入图片描述

3.4.2 备用方案

工作是不(可)能失败的

celery的本质是代替用户管理队列,当使用RabbitMQ作为Broker的时候celery会创建两个临时队列。现在我要做的就是直接使用消息队列。 在这里插入图片描述 备用方案以RabbitMQ作为队列,使用Pika连接MQ产生和消费消息。消息队列分为两个,一个类似Worker, 一个类似Manager。通过Flask提供接口服务。 使用的场景是这样的:

1 在本地起了一个Web Server,可以向消息队列发送请求,例如向模型请求一个结果。2 在本地起了一个Pika Server,处理Web Server发到队列中的消息,这个Server称为B Server。消息处理完成的结果会送到另一个消息队列。3 在本地起了一个Pika Server, 处理消息队列中的结果信息(由B Server发出),然后将这个结果保存到Web Server的静态目录下(或者Mongo数据库中/这个需求下传到Mysql也可以),这个Server称为A Server。

前台使用:

1 当需要解析一段文本的中的实体,使用者将请求以json的形式发到Web Server 2 Web Server计算这段文本的摘要,将计算请求封装为pickle文本序列,发往B Server 处理的消息队列。 3 隔一段时间用户就可以查询到计算结果(在数据库中)

后台处理:

1 B Server从消息队列中获取到待处理的原始数据,完成后向队列中写入 2 A Server从消息队列中获取处理完成的结果,向数据库写入

在RabbitMQ中定义了两个队列,q1_send是B Server 用的信道,用于接受原始数据,并进行处理。q1_back则是B Server处理后将结果发送到的信道,由A Server获取并进行处理,例如存数据库。 在这里插入图片描述

B Server (Worker)

B Server相当于Worker, 是处理任务的核心。

. ├── Dockerfile 构建Docker镜像的内容 ├── app 核心函数 │ └── bfuncs.py ├── config 配置 │ ├── model1 载入的模型配置文件 │ │ ├── model_info.json │ │ └── model_weights.h5 │ └── test.conf 其他基本配置文件 ├── data 本地存储的结果 │ └── test_digest.pkl 某一个文件处理后的结果(明细) ├── entry_bert_ner.py B Server的入口程序 ├── env 环境变量 requirments.txt是python的包列表 │ ├── entrypoint.sh │ └── requirements.txt ├── log 日志 ├── packages 自定义包

核心的处理函数 bfuncs.py

# 计算文本指标 def get_ner(**kw): msg_obj = kw.get('msg_obj') # 第一个参数是pkl字符串 data = dm.pickle.loads(kw.get('data')) filename = data.get('filename') digest = data.get('digest') tem_file = data.get('tem_file') # 模型处理 res_dict_x = extract_ner_from_text_lines(filename, digest, tem_file) # 本地保存。调用时路径是entry_bert_ner.py的路径 dm.to_pickle(res_dict_x, digest , path = './data/') # 消息发送 other_meta_dict = {} other_meta_dict['status'] = 'success' res_dict = {} res_dict['filename'] = filename res_dict['digest'] = digest res_dict['company_list'] = res_dict_x['company_list'] res_dict['start_time'] = res_dict_x['start_str'] res_dict['duration'] = res_dict_x['duration'] msg_obj.queue_key = 'q1_back_key' msg_obj.new_msg(res_dict, 'ner_result', other_meta_dict) dm.send_a_message(**dict(msg_obj))

B Server入口程序entry_bert_ner.py

import app.bfuncs as bf import os from datetime import datetime import pickle from functools import partial import DataManipulation as dm current_dir = os.path.abspath(os.path.dirname(__file__)) + '/app/' # # ===定义函数字典=== # # 例子 def helloworld_consume_func(**kw): for k in kw.keys(): print(k, ':', kw[k]) print('helloworld is handling') func_dict = {} func_dict['helloworld'] = helloworld_consume_func func_dict['get_ner'] = bf.get_ner # # 回调函数是一个有函数字典的偏函数 def callback_template(ch, method, properties, body, func_dict=None, msg_obj=None): # 解开外层pickle(meta可直接使用) input_data = pickle.loads(body) # 把消息对象加进去 input_data['msg_obj'] = msg_obj print('[B]*** Receving Message') # 测试 funcname = input_data.get('meta').get('handle_func') print('[I] MQ function ',funcname, 'Trying') func = func_dict[funcname] func(**input_data) if __name__ =='__main__': msg_config_dict = dm.read_conf('./config/test.conf').get('msgobj') msg_obj = dm.OMQ(**msg_config_dict) print(msg_config_dict) # 通过偏置将回调函数绑定函数字典 callback1 = partial(callback_template, func_dict=func_dict, msg_obj = msg_obj) # 创建连接和通道 connection = dm.make_mq_connection(**dict(msg_obj)) cur_channel = connection.channel() # 为通道绑定回调函数(字典) # 可绑定一条或多条信道 | A_server处理返回的消息 cur_channel.basic_consume(callback1, queue='q1_send', no_ack=True) print('[I] MQ_B_Server >>> Waiting For Message : ') # 进入无限循环等待 cur_channel.start_consuming()

这里需要考虑一个点,也是之前为啥想用celery的原因。

Ubuntu 主机使用无线网络会造成RabbitMQ的丢包。 这种丢包会导致消息被消费掉,但是没有动作。例如看起来B Server一直在处理消息,但实际上并没有。

所以,这种模式下每台主机上都要有自己的RabbitMQ,而不是共用一个公网的RabbitMQ。当然机器如果插了网线还是比较靠谱的,假设未来机器都可以插网线(或者至少无线网络比较稳定),那么可以连同一个公网的RabbitMQ。并且通过A Server端的管理,实现比较稳定的输出。

迁移到算网主机之前,再稍微调整一个docker-compose和Dockerfile文件。 docker-compose.yaml

version: '2' services: api: build: context: ./bert_ner/ dockerfile: Dockerfile restart: always volumes: - ./bert_ner/:/opt/app:rw

Dockerfile

FROM python:3.6 WORKDIR /opt/app COPY . . RUN pip install packages/DataManipulation-0.1.18.2-py3-none-any.whl RUN pip install -r env/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/ # EXPOSE 5000 ENTRYPOINT ["sh", "env/entrypoint.sh"]

拷到目标主机后需要build测试一下,有些函数包一开始可能没有理的很严格。

RabbitMQ

使用容器快速的搭建一个消息服务(但有些细节暂时没弄,例如用户、虚机的配置,还有诸如延时消息插件等),避免自己去装(因为还要装Erlang,不同版本的RabbitMQ和Erlang还有对应关系,很烦) docker-compose.yaml

version: '2' services: rabbitmq: image: rabbitmq:3.8.3-management container_name: rabbitmq restart: always hostname: myRabbitmq ports: - 15672:15672 - 5672:5672 volumes: - ./rabbit/data:/var/lib/rabbitmq environment: - RABBITMQ_DEFAULT_USER=YOURUSER - RABBITMQ_DEFAULT_PASS=YOURPASSWD A Server (Manager)

A Server 目前承担(汇总)结果的暂存,查询和数据库的写入等基本任务。未来A Server将会承担高级调度任务,例如并行、冗余的分发等。

A Server 处理 B Server(xprj10)返回到队列里的结果:

1 将汇总的结果数据保存在本地2 提供根据摘要查询结果的方法3 写入指定的数据库Mysql

A Server的项目结构和B Server相同,只是入口函数不同。 entry_bert_ner_aserver.py

... # 数据库连接地址 cfg_mysql = dm.read_conf('./config/test.conf').get('cfg_mysql') # 确保结果表会存在 sql_create_table =''' create table if not exists ner1 ( id INT(11) AUTO_INCREMENT, PRIMARY KEY(id), digests VARCHAR(64), ners varchar(1000), start_dt date, duration FLOAT, update_datetime date ); ''' print(cfg_mysql) cfg_mysql['port'] = int(cfg_mysql['port']) print('>>>>> ner1: ', dm.exe_sql(sql_create_table, cfg_mysql)) ...

这步确保之后要处理时,表是存在的 在这里插入图片描述

MySQL

本次的结果可以使用MySQL存储。(当然,本来最好是Mongo)

idfilenamedigestnersstart_dtdurationupdate_datetime自增ID文件名文件的sha256摘要实体列表字符串,varchar1000开始时间计算时长更新时间

建立一张ner1表来存储所有的结果

create table if not exists ner1 ( id INT(11) AUTO_INCREMENT, PRIMARY KEY(id), filename varchar(200), digests VARCHAR(64), ners varchar(1000), start_dt datetime, duration FLOAT, update_datetime datetime );

建mysql不需要Dockerfile, 只需要docker-compose.yaml,文件内容如下

version: '2' services: mysql: restart: always command: --default-authentication-plugin=mysql_native_password image: mysql:5.7.16 container_name: andy_mysql volumes: - ./mysql/env:/mydir - ./mysql/data:/var/lib/mysql - ./mysql/config/my.cnf:/etc/my.cnf # 数据库还原目录 可将需要还原的sql文件放在这里 - /docker/mysql/source:/docker-entrypoint-initdb.d environment: - "MYSQL_ROOT_PASSWORD=YOURPASS" - "MYSQL_DATABASE=YOURDB" - "TZ=Asia/Shanghai" ports: - 3306:3306

Note: 如果对应的库没有建可以先登录客户端建一个

# 查库 show databases; # 建库 create database YOURDB; # 删库 drop database YOURDB; Web Server(Flask)

Web Server 实现用户使用的具体操作入口,主要的功能有三个:

1 提供任务请求的接口函数,向B Server 发出请求 将请求的哈希值和时间保留在本地 2 提供任务结果的接口函数,向数据库发起请求,获得结果3 汇总计算收到总的请求数和当前数据库里的总数(任务完成百分比)

发起请求 view1.py

''' 参数示例: {'filename': 'xxxxx.txt', 'data': 'xxxxxxxx'} ''' # 调用任务 @app1.route('/publish_ner_task/', methods=['GET', 'POST']) def publish_ner_task(): print('>>>>>>a1') res_dict = {} # input input_dict = request.get_json() filename = input_dict.get('filename') # data为文本 data = input_dict.get('data') if not all([filename, data]): res_dict['status'] = False res_dict['msg'] = 'filename和data参数不能为空' return jsonify(res_dict) # 计算摘要 digest = dm.get_sha256_digest(data.encode()) res_dict['digest'] = digest res_dict['create_dt'] = dm.get_curdatetime_str() # 尝试发布任务 try: other_meta_dict = {} data_dict = {} data_dict['filename'] = filename data_dict['digest'] = digest # 根据换行符变成行列表 data_dict['tem_file'] = data.split('\n') # 发送消息到B Server msg_obj.new_msg(data_dict, 'get_ner', other_meta_dict) msg_obj.queue_key = 'q1_send_key' dm.send_a_message(**dict(msg_obj)) except: res_dict['status'] = False res_dict['msg'] = '任务发布失败' return jsonify(res_dict) # 发布成功才会走到这里 # 将哈希码追加到日志 dm.append_rec2log(digest,'./log/receives.log') res_dict['status'] = True res_dict['msg'] ='任务发布成功' return jsonify(res_dict)

获取结果 view2.py

# 调用任务 @app1.route('/get_ner_result/', methods=['GET', 'POST']) def get_ner_result(): print('>>>>>>a1') res_dict = {} # input input_dict = request.get_json() digest = input_dict.get('digest') if not digest: res_dict['status'] = False res_dict['msg'] ='查询结果必须提供摘要' return jsonify(res_dict) # 发起数据库请求 res_df = dm.mysql_read_table_where('ner1', "where digests ='%s'" % digest, cfg_mysql=cfg_mysql) if len(res_df): tem_dict = dict(res_df.iloc[-1]) df_dict ={} df_dict['filename'] = tem_dict['filename'] df_dict['digests'] = tem_dict['digests'] df_dict['ners'] = tem_dict['ners'] df_dict['start_dt'] = tem_dict['start_dt'].strftime('%Y-%m-%d %H:%M:%S') df_dict['duration'] = str(tem_dict['duration']) df_dict['update_datetime'] = tem_dict['update_datetime'].strftime('%Y-%m-%d %H:%M:%S') res_dict['data'] = df_dict res_dict['msg'] ='ok' else: res_dict['msg'] ='计算可能未完成' res_dict['status'] = True return jsonify(res_dict) ''' 返回示例 {'data': {'digests': '2272dd2ecbc080747b0a082c384871174f451f690fae1355828b734d03c2eea3', 'duration': '0.308103', 'filename': 'xxxxx.txt', 'ners': 'xxxxxx', 'start_dt': xxxx', 'update_datetime': 'xxxx'}, 'msg': 'ok', 'status': True} '''

统计任务完成百分比 view3.py

@app1.route('/check_task_local/', methods=['GET', 'POST']) def check_task_local(): res_dict = {} try: # 接收到的任务条数 count = 0 with open("./log/receives.log") as thefile: while True: buffer = thefile.read(1024*8192) if not buffer: break count += buffer.count('\n') # 检查数据库的条数 result_count = dm.mysql_query_counts('ner1',' ', cfg_mysql) res_dict['data'] = {} res_dict['data']['recevies'] = int(count) res_dict['data']['result'] = int(result_count) res_dict['data']['referenc_pct'] = float(result_count/count) except: res_dict['status'] =False res_dict['msg'] ='统计失败' return jsonify(res_dict) res_dict['status'] =True res_dict['msg'] ='统计成功' return jsonify(res_dict) ''' Result Code 200 {'data': {'recevies': 11, 'referenc_pct': 0.18181818181818182, 'result': 2}, 'msg': '统计成功', 'status': True} ''' 发起请求的处理流程 1 读取数据,以json形式post 2 Web Server 收到数据,返回回执(哈希码) 3 Web Server 向B Server 发起计算请求(q1_send队列) 4 B Server计算,将结果发往A Server(q1_back队列) 5 A Server将数据在本地缓存一份,并发往数据库保存

– 实现的代码待续 –

4 Next:

使用celery就是要实现高可用、高效率。前面的异步执行如果说实现高效率的话,那么超时、重试和回调等功能就是实现高可用。如果celery不好用的话,应该也可以自己实现。例如RabbitMQ本身就带了延时消息的功能(有版本要求,需要加个插件),基于这个功能很多事就可以做了:

1 超时。向B Server发消息的时候同时发一条延时消息到A Server, 延时消息中可以指定A 不再等待。2 重试。同样使用延时消息,发出消息一旦出现错误,则可以发出延时消息再次请求。3 回调。假设所有的任务都在B Server中(B Server也可以不止一个),当A收到B的某个带回调的结果后,可以再向B插入另一条指定的任务,从而实现回调。

基于超时、重试和回调的功能,可以进一步实现这些功能:

1 冗余分发。一条计算请求,可以同时发给3个B Server, 然后设定超时。在指定的时间内,A Server 会收集到可用的结果。在此基础上判断计算的结果是否正确,B Server的效率和可用性等。2 链式处理。每条计算请求可以指定下一跳,A Server收到结果后会将其按照其链条的顺序一步步的处理。3 并行处理。将普通的循环分为多个任务(每个任务一条消息),一次将所有的消息发往MQ。同时启动多个B Server,每个B Server将会占用一个进程处理任务。


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3