Python 全栈系列48 |
您所在的位置:网站首页 › celery异步结果多久会清理 › Python 全栈系列48 |
说明
有一些非常耗时的任务,无法实现实时的RPC调用。因此计划使用celery + flask提供异步任务调度服务。 一个请求的服务过程是这样: 1 服务器接到一个请求(一个几k到几百k的文本)2 服务器计算摘要作为键值,将其加入异步任务。3 服务器将摘要返回,状态为calculating。4 异步任务执行耗时计算,结果有两个副本。一个存在本地(pkl),一个发往目标服务器。这样如果目标服务器没收到,服务就把本地的副本读取再发送,避免重新计算。 1 流程与项目结构大体上是这样的结构,整个服务存在训练任务和预测任务。预测结果保存在本地和目标数据库。 启动的时候分别在三个终端输入命令: # 运行flask服务 gunicorn entry_ner_cpu:app -b 0.0.0.0:5001 # 使用celery执行异步任务 celery -A entry_ner_cpu.celery_ worker # 使用flower监督异步任务 flower --basic_auth=admin:admin --broker=redis://127.0.0.1:6379/0 --address=0.0.0.0 --port=5556作用分别是启动flask服务(基本web服务),启动celery服务(异步任务),启动flower服务(监控任务)。 1 flower的监控面板 2 执行任务接口 调用计算任务的接口。(该计算任务计算1+2) 3 查询任务结果的接口 使用任务ID获取结果。 有些文章也介绍了celery的配置和flask是有不同的,单独配置。 2.1 Celery 项目结构先把celery包装成一个类似函数包的方式,项目结构类似,内容参考这篇文章
其中entry_test_celery1.py作为入口程序和实际存放异步任务的文件夹test_celery1平级。如果是函数包的话,我习惯把所有的函数收拢到__init__.py下面,在这个case下所有的函数(任务taskx)收拢到tasks.py下面。 1 __init__.py初始化celery实例,指定本地的broker、backend(结果)和任务列表 from celery import Celery app = Celery('test_celery', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0', include=['test_celery1.tasks']) 2 task1.py放了一个简单的求和任务,耗时10秒 from . import app import time @app.task def my_background_task(arg1, arg2): # 两数相加 time.sleep(10) return arg1+arg2 3 tasks.py任务就是把所有的任务收拢起来(当前只有一个) from . task1 import my_background_task以上三个文件组成的包完成了一个最简单的celery任务定义。 4 使用 entry_test_celery1.py调用celery任务。原文本来使用ready和result两个函数来进行轮询,我使用了AsyncResult来直接获取。 from test_celery1.tasks import my_background_task import time from celery.result import AsyncResult # 先启动test_celery1 # celery -A test_celery1.app worker --loglevel=info if __name__ == '__main__': task = my_background_task.delay(1, 2) task_id = task.id #此时,任务还未完成,它将返回False # print('Task finished? ', result.ready()) # print('Task result: ', result.result) print('Task finished? ', task_id) print('Task result: ', AsyncResult(id=task_id).get()) # 延长到10秒以确保任务已经完成 time.sleep(10) # 现在任务完成,ready方法将返回True print('Task finished? ', task_id) print('Task result: ', AsyncResult(id=task_id).get())使用时先启动celery,切换到入口函数的当前目录,执行 celery -A test_celery1.app worker --loglevel=info出现了熟悉的芹菜 将入口函数改为一个简易的flask服务。 from test_celery1.tasks import my_background_task import time from celery.result import AsyncResult from flask import Flask app = Flask(__name__) @app.route('/test_add//', methods=['GET','POST']) def test_add(x,y): # 将请求塞入任务 task = my_background_task.delay(int(x),int(y)) # 返回任务信息 return 'Task Commited' + str(task.id) @app.route('/get_result/', methods=['GET','POST']) def get_result(task_id): print('Accompolished Status', AsyncResult(id=task_id).status) return 'Accompolished Status' + AsyncResult(id=task_id).status if __name__ == '__main__': app.run(debug=True)调用接口计算 查看任务状态:第一次看还没有完成,过几秒就完成了
如果直接启动会报如下错误 python3 entry_test_celery1_flask.py
启动flower观察也是没有问题的 flower --basic_auth=admin:admin --broker=redis://127.0.0.1:6379/0 --address=0.0.0.0 --port=5556 3 搭建一个docker-compose项目进行应用 3.1 应用流程 1 处理请求提交的一个文件,计算文件摘要。将任务id和文件摘要返给请求者2 将文件处理好格式送给某个耗时函数处理,完成后存在本地。一个是data,一个是meta。meta保存了任务的状态,data则保留了数据,以摘要为文件名。– 待续 – 3.2 项目打包将耗时应用打包为任务,启动flask+ celery提供服务。 . ├── Dockerfile ├── app │ ├── __init__.py │ ├── app1 │ │ ├── __init__.py │ │ ├── view1.py │ │ └── view2.py │ ├── main │ │ ├── __init__.py │ │ └── views.py │ ├── static │ │ ├── img │ │ └── vendor │ └── templates │ ├── base.html │ └── main ├── config │ ├── celeryconfig.py │ └── config.py ├── data ├── entry_ner_cpu.py ├── env │ ├── entrypoint.sh │ └── requirements.txt ├── local_celery │ ├── __init__.py │ ├── celeryconfig.py │ ├── task1.py │ ├── task2.py │ ├── task3.py │ └── tasks.pycelery的任务在local_celery中定义,在app的__init__.py进行挂接 ... # >>>>>>>>>>>>>>>>>>> 引入local_celery # 引入local_celery的应用 import local_celery.tasks as tt # 引入celery的任务结果位置 from celery import Celery celery_result_q = Celery(backend='redis://127.0.0.1:6379/0') ... 3.3 使用测试view1.py,发送任务 # 导入蓝图名称 from . import app1 # 导入最基本的必要功能 from flask import render_template, jsonify, request # from app import local_celery_app from app import tt, celery_result_q # 调用任务 @app1.route('/test_sum/', methods=['GET', 'POST']) def test_sum(): res_dict = {} # 输入数据 input_dict = request.get_json() x = input_dict.get('x') y = input_dict.get('y') if not all([x,y]): res_dict['status'] = False res_dict['msg'] = '输入参数为x,y' return jsonify(res_dict) try: # 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果 task = tt.my_backend_task.delay(int(x), int(y)) except: res_dict['status'] = False res_dict['msg'] = '调用任务失败' return jsonify(res_dict) res_dict['status'] = True res_dict['msg'] = 'ok' res_dict['data'] = str(task.id) return jsonify(res_dict)测试view2.py,获取任务结果 # 导入蓝图名称 from . import app1 # 导入最基本的必要功能 from flask import render_template, jsonify, request # from app import local_celery_app from app import tt, celery_result_q # 调用任务 @app1.route('/test_get_result/', methods=['GET', 'POST']) def test_get_result(): # 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果 res_dict = {} # 输入数据 input_dict = request.get_json() task_id = input_dict.get('task_id') if not all([task_id]): res_dict['status'] = False res_dict['msg'] = '输入参数为task_id' return jsonify(res_dict) try: # print('aaa') cal_status =celery_result_q.AsyncResult(id=task_id).status # print('bbb') # print('ccc') except: res_dict['status'] = False res_dict['msg'] = '获取任务状态失败' return jsonify(res_dict) if cal_status =='SUCCESS': cal_res = celery_result_q.AsyncResult(id=task_id).get() else: cal_res = '' res_dict['status'] = True res_dict['msg'] = '查询成功' res_dict['cal_status'] = cal_status res_dict['cal_res'] = cal_res return jsonify(res_dict)测试结果: import requests as req import json url_prefix = 'http://localhost:5000/app1/' # 1 发送任务 data = {'x': '433', 'y': 2} r = req.post(url_prefix + 'test_sum/', json=data).text print(json.loads(r)) task_id = json.loads(r).get('data') # 2 查询结果 # data = {'task_id': '938d5f4c-b559-4e36-874e-8be57d889e33'} data = {'task_id': task_id} r = req.post(url_prefix + 'test_get_result/', json=data).text print(json.loads(r)) --- In [31]: run test14_celery.py {'data': '60388eb4-1eaf-4a43-9ede-409fed152617', 'msg': 'ok', 'status': True} {'cal_res': '', 'cal_status': 'PENDING', 'msg': '查询成功', 'status': True} In [32]: r = req.post(url_prefix + 'test_get_result/', json=data).text ...: ...: print(json.loads(r)) ...: {'cal_res': '', 'cal_status': 'PENDING', 'msg': '查询成功', 'status': True} In [35]: r = req.post(url_prefix + 'test_get_result/', json=data).text ...: ...: print(json.loads(r)) ...: {'cal_res': 435, 'cal_status': 'SUCCESS', 'msg': '查询成功', 'status': True} 3.4 实际任务 3.4.1 原计划(失败)原本的计划: 1 视图函数1:接受数据,返回摘要和任务号,并将任务送入计算队列2 视图函数2:查询任务状态/结果,如果完成会返回结果。(应用的结果使用task_id返回,明细使用摘要查询)3 任务1(task1):计算任务。根据传入的文本执行计算,返回去重后的结果和明细。4 任务2:存储。暂时合并在任务1,最好是结合回调来做。碰到的问题:调用神经网络模型(BERT)celery无反应 从日志里能看到,任务接收到了,但是查询状态一直是PENDING。发现没有很好的文档来介绍celery深度的使用,我觉得大概率以后我会自己写一些关键功能。 工作是不(可)能失败的 celery的本质是代替用户管理队列,当使用RabbitMQ作为Broker的时候celery会创建两个临时队列。现在我要做的就是直接使用消息队列。 前台使用: 1 当需要解析一段文本的中的实体,使用者将请求以json的形式发到Web Server 2 Web Server计算这段文本的摘要,将计算请求封装为pickle文本序列,发往B Server 处理的消息队列。 3 隔一段时间用户就可以查询到计算结果(在数据库中) 后台处理: 1 B Server从消息队列中获取到待处理的原始数据,完成后向队列中写入 2 A Server从消息队列中获取处理完成的结果,向数据库写入 在RabbitMQ中定义了两个队列,q1_send是B Server 用的信道,用于接受原始数据,并进行处理。q1_back则是B Server处理后将结果发送到的信道,由A Server获取并进行处理,例如存数据库。 B Server相当于Worker, 是处理任务的核心。 . ├── Dockerfile 构建Docker镜像的内容 ├── app 核心函数 │ └── bfuncs.py ├── config 配置 │ ├── model1 载入的模型配置文件 │ │ ├── model_info.json │ │ └── model_weights.h5 │ └── test.conf 其他基本配置文件 ├── data 本地存储的结果 │ └── test_digest.pkl 某一个文件处理后的结果(明细) ├── entry_bert_ner.py B Server的入口程序 ├── env 环境变量 requirments.txt是python的包列表 │ ├── entrypoint.sh │ └── requirements.txt ├── log 日志 ├── packages 自定义包核心的处理函数 bfuncs.py # 计算文本指标 def get_ner(**kw): msg_obj = kw.get('msg_obj') # 第一个参数是pkl字符串 data = dm.pickle.loads(kw.get('data')) filename = data.get('filename') digest = data.get('digest') tem_file = data.get('tem_file') # 模型处理 res_dict_x = extract_ner_from_text_lines(filename, digest, tem_file) # 本地保存。调用时路径是entry_bert_ner.py的路径 dm.to_pickle(res_dict_x, digest , path = './data/') # 消息发送 other_meta_dict = {} other_meta_dict['status'] = 'success' res_dict = {} res_dict['filename'] = filename res_dict['digest'] = digest res_dict['company_list'] = res_dict_x['company_list'] res_dict['start_time'] = res_dict_x['start_str'] res_dict['duration'] = res_dict_x['duration'] msg_obj.queue_key = 'q1_back_key' msg_obj.new_msg(res_dict, 'ner_result', other_meta_dict) dm.send_a_message(**dict(msg_obj))B Server入口程序entry_bert_ner.py import app.bfuncs as bf import os from datetime import datetime import pickle from functools import partial import DataManipulation as dm current_dir = os.path.abspath(os.path.dirname(__file__)) + '/app/' # # ===定义函数字典=== # # 例子 def helloworld_consume_func(**kw): for k in kw.keys(): print(k, ':', kw[k]) print('helloworld is handling') func_dict = {} func_dict['helloworld'] = helloworld_consume_func func_dict['get_ner'] = bf.get_ner # # 回调函数是一个有函数字典的偏函数 def callback_template(ch, method, properties, body, func_dict=None, msg_obj=None): # 解开外层pickle(meta可直接使用) input_data = pickle.loads(body) # 把消息对象加进去 input_data['msg_obj'] = msg_obj print('[B]*** Receving Message') # 测试 funcname = input_data.get('meta').get('handle_func') print('[I] MQ function ',funcname, 'Trying') func = func_dict[funcname] func(**input_data) if __name__ =='__main__': msg_config_dict = dm.read_conf('./config/test.conf').get('msgobj') msg_obj = dm.OMQ(**msg_config_dict) print(msg_config_dict) # 通过偏置将回调函数绑定函数字典 callback1 = partial(callback_template, func_dict=func_dict, msg_obj = msg_obj) # 创建连接和通道 connection = dm.make_mq_connection(**dict(msg_obj)) cur_channel = connection.channel() # 为通道绑定回调函数(字典) # 可绑定一条或多条信道 | A_server处理返回的消息 cur_channel.basic_consume(callback1, queue='q1_send', no_ack=True) print('[I] MQ_B_Server >>> Waiting For Message : ') # 进入无限循环等待 cur_channel.start_consuming()这里需要考虑一个点,也是之前为啥想用celery的原因。 Ubuntu 主机使用无线网络会造成RabbitMQ的丢包。 这种丢包会导致消息被消费掉,但是没有动作。例如看起来B Server一直在处理消息,但实际上并没有。 所以,这种模式下每台主机上都要有自己的RabbitMQ,而不是共用一个公网的RabbitMQ。当然机器如果插了网线还是比较靠谱的,假设未来机器都可以插网线(或者至少无线网络比较稳定),那么可以连同一个公网的RabbitMQ。并且通过A Server端的管理,实现比较稳定的输出。 迁移到算网主机之前,再稍微调整一个docker-compose和Dockerfile文件。 docker-compose.yaml version: '2' services: api: build: context: ./bert_ner/ dockerfile: Dockerfile restart: always volumes: - ./bert_ner/:/opt/app:rwDockerfile FROM python:3.6 WORKDIR /opt/app COPY . . RUN pip install packages/DataManipulation-0.1.18.2-py3-none-any.whl RUN pip install -r env/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/ # EXPOSE 5000 ENTRYPOINT ["sh", "env/entrypoint.sh"]拷到目标主机后需要build测试一下,有些函数包一开始可能没有理的很严格。 RabbitMQ使用容器快速的搭建一个消息服务(但有些细节暂时没弄,例如用户、虚机的配置,还有诸如延时消息插件等),避免自己去装(因为还要装Erlang,不同版本的RabbitMQ和Erlang还有对应关系,很烦) docker-compose.yaml version: '2' services: rabbitmq: image: rabbitmq:3.8.3-management container_name: rabbitmq restart: always hostname: myRabbitmq ports: - 15672:15672 - 5672:5672 volumes: - ./rabbit/data:/var/lib/rabbitmq environment: - RABBITMQ_DEFAULT_USER=YOURUSER - RABBITMQ_DEFAULT_PASS=YOURPASSWD A Server (Manager)A Server 目前承担(汇总)结果的暂存,查询和数据库的写入等基本任务。未来A Server将会承担高级调度任务,例如并行、冗余的分发等。 A Server 处理 B Server(xprj10)返回到队列里的结果: 1 将汇总的结果数据保存在本地2 提供根据摘要查询结果的方法3 写入指定的数据库MysqlA Server的项目结构和B Server相同,只是入口函数不同。 entry_bert_ner_aserver.py ... # 数据库连接地址 cfg_mysql = dm.read_conf('./config/test.conf').get('cfg_mysql') # 确保结果表会存在 sql_create_table =''' create table if not exists ner1 ( id INT(11) AUTO_INCREMENT, PRIMARY KEY(id), digests VARCHAR(64), ners varchar(1000), start_dt date, duration FLOAT, update_datetime date ); ''' print(cfg_mysql) cfg_mysql['port'] = int(cfg_mysql['port']) print('>>>>> ner1: ', dm.exe_sql(sql_create_table, cfg_mysql)) ...这步确保之后要处理时,表是存在的 本次的结果可以使用MySQL存储。(当然,本来最好是Mongo) idfilenamedigestnersstart_dtdurationupdate_datetime自增ID文件名文件的sha256摘要实体列表字符串,varchar1000开始时间计算时长更新时间建立一张ner1表来存储所有的结果 create table if not exists ner1 ( id INT(11) AUTO_INCREMENT, PRIMARY KEY(id), filename varchar(200), digests VARCHAR(64), ners varchar(1000), start_dt datetime, duration FLOAT, update_datetime datetime );建mysql不需要Dockerfile, 只需要docker-compose.yaml,文件内容如下 version: '2' services: mysql: restart: always command: --default-authentication-plugin=mysql_native_password image: mysql:5.7.16 container_name: andy_mysql volumes: - ./mysql/env:/mydir - ./mysql/data:/var/lib/mysql - ./mysql/config/my.cnf:/etc/my.cnf # 数据库还原目录 可将需要还原的sql文件放在这里 - /docker/mysql/source:/docker-entrypoint-initdb.d environment: - "MYSQL_ROOT_PASSWORD=YOURPASS" - "MYSQL_DATABASE=YOURDB" - "TZ=Asia/Shanghai" ports: - 3306:3306Note: 如果对应的库没有建可以先登录客户端建一个 # 查库 show databases; # 建库 create database YOURDB; # 删库 drop database YOURDB; Web Server(Flask)Web Server 实现用户使用的具体操作入口,主要的功能有三个: 1 提供任务请求的接口函数,向B Server 发出请求 将请求的哈希值和时间保留在本地 2 提供任务结果的接口函数,向数据库发起请求,获得结果3 汇总计算收到总的请求数和当前数据库里的总数(任务完成百分比)发起请求 view1.py ''' 参数示例: {'filename': 'xxxxx.txt', 'data': 'xxxxxxxx'} ''' # 调用任务 @app1.route('/publish_ner_task/', methods=['GET', 'POST']) def publish_ner_task(): print('>>>>>>a1') res_dict = {} # input input_dict = request.get_json() filename = input_dict.get('filename') # data为文本 data = input_dict.get('data') if not all([filename, data]): res_dict['status'] = False res_dict['msg'] = 'filename和data参数不能为空' return jsonify(res_dict) # 计算摘要 digest = dm.get_sha256_digest(data.encode()) res_dict['digest'] = digest res_dict['create_dt'] = dm.get_curdatetime_str() # 尝试发布任务 try: other_meta_dict = {} data_dict = {} data_dict['filename'] = filename data_dict['digest'] = digest # 根据换行符变成行列表 data_dict['tem_file'] = data.split('\n') # 发送消息到B Server msg_obj.new_msg(data_dict, 'get_ner', other_meta_dict) msg_obj.queue_key = 'q1_send_key' dm.send_a_message(**dict(msg_obj)) except: res_dict['status'] = False res_dict['msg'] = '任务发布失败' return jsonify(res_dict) # 发布成功才会走到这里 # 将哈希码追加到日志 dm.append_rec2log(digest,'./log/receives.log') res_dict['status'] = True res_dict['msg'] ='任务发布成功' return jsonify(res_dict)获取结果 view2.py # 调用任务 @app1.route('/get_ner_result/', methods=['GET', 'POST']) def get_ner_result(): print('>>>>>>a1') res_dict = {} # input input_dict = request.get_json() digest = input_dict.get('digest') if not digest: res_dict['status'] = False res_dict['msg'] ='查询结果必须提供摘要' return jsonify(res_dict) # 发起数据库请求 res_df = dm.mysql_read_table_where('ner1', "where digests ='%s'" % digest, cfg_mysql=cfg_mysql) if len(res_df): tem_dict = dict(res_df.iloc[-1]) df_dict ={} df_dict['filename'] = tem_dict['filename'] df_dict['digests'] = tem_dict['digests'] df_dict['ners'] = tem_dict['ners'] df_dict['start_dt'] = tem_dict['start_dt'].strftime('%Y-%m-%d %H:%M:%S') df_dict['duration'] = str(tem_dict['duration']) df_dict['update_datetime'] = tem_dict['update_datetime'].strftime('%Y-%m-%d %H:%M:%S') res_dict['data'] = df_dict res_dict['msg'] ='ok' else: res_dict['msg'] ='计算可能未完成' res_dict['status'] = True return jsonify(res_dict) ''' 返回示例 {'data': {'digests': '2272dd2ecbc080747b0a082c384871174f451f690fae1355828b734d03c2eea3', 'duration': '0.308103', 'filename': 'xxxxx.txt', 'ners': 'xxxxxx', 'start_dt': xxxx', 'update_datetime': 'xxxx'}, 'msg': 'ok', 'status': True} '''统计任务完成百分比 view3.py @app1.route('/check_task_local/', methods=['GET', 'POST']) def check_task_local(): res_dict = {} try: # 接收到的任务条数 count = 0 with open("./log/receives.log") as thefile: while True: buffer = thefile.read(1024*8192) if not buffer: break count += buffer.count('\n') # 检查数据库的条数 result_count = dm.mysql_query_counts('ner1',' ', cfg_mysql) res_dict['data'] = {} res_dict['data']['recevies'] = int(count) res_dict['data']['result'] = int(result_count) res_dict['data']['referenc_pct'] = float(result_count/count) except: res_dict['status'] =False res_dict['msg'] ='统计失败' return jsonify(res_dict) res_dict['status'] =True res_dict['msg'] ='统计成功' return jsonify(res_dict) ''' Result Code 200 {'data': {'recevies': 11, 'referenc_pct': 0.18181818181818182, 'result': 2}, 'msg': '统计成功', 'status': True} ''' 发起请求的处理流程 1 读取数据,以json形式post 2 Web Server 收到数据,返回回执(哈希码) 3 Web Server 向B Server 发起计算请求(q1_send队列) 4 B Server计算,将结果发往A Server(q1_back队列) 5 A Server将数据在本地缓存一份,并发往数据库保存– 实现的代码待续 – 4 Next:使用celery就是要实现高可用、高效率。前面的异步执行如果说实现高效率的话,那么超时、重试和回调等功能就是实现高可用。如果celery不好用的话,应该也可以自己实现。例如RabbitMQ本身就带了延时消息的功能(有版本要求,需要加个插件),基于这个功能很多事就可以做了: 1 超时。向B Server发消息的时候同时发一条延时消息到A Server, 延时消息中可以指定A 不再等待。2 重试。同样使用延时消息,发出消息一旦出现错误,则可以发出延时消息再次请求。3 回调。假设所有的任务都在B Server中(B Server也可以不止一个),当A收到B的某个带回调的结果后,可以再向B插入另一条指定的任务,从而实现回调。基于超时、重试和回调的功能,可以进一步实现这些功能: 1 冗余分发。一条计算请求,可以同时发给3个B Server, 然后设定超时。在指定的时间内,A Server 会收集到可用的结果。在此基础上判断计算的结果是否正确,B Server的效率和可用性等。2 链式处理。每条计算请求可以指定下一跳,A Server收到结果后会将其按照其链条的顺序一步步的处理。3 并行处理。将普通的循环分为多个任务(每个任务一条消息),一次将所有的消息发往MQ。同时启动多个B Server,每个B Server将会占用一个进程处理任务。 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |