flask,蓝图使用,g对象,数据库连接池

您所在的位置:网站首页 flask数据库查询 flask,蓝图使用,g对象,数据库连接池

flask,蓝图使用,g对象,数据库连接池

2023-05-10 19:43| 来源: 网络整理| 查看: 265

内容回顾 cbv使用

写一个类继承MethodView写get,post…

类属性decorators=[auth]可以加装饰器

CBV执行流程

跟django的执行流程一样

endpoint作用 路径别名

add_url_rule(view_func=IndexView.as_view(‘index’))

为什么endpoint不传,是被路由装饰器装饰的函数名:函数名._name_

装饰器的执行先后顺序,从上到下执行

模板语法

使用的是jinja2更加强大,兼容django的dtl模板语法

请求响应

请求:全局request对象,在不同视图函数中尽管使用,不会错乱

method,path,files,form

响应四件套:字符串,render_template,redirect,jsonify,响应对象,make_response包裹一下四件套之一,

设置cookie,set_cookies

响应对象.headers 响应头

session使用

设置密钥

全局导入,直接赋值使用,取值

session执行流程

请求来了走open_session前端携带cookie到后端,后端取出来cookie对于的value值,解密,转到session对象中,后续在视图函数中,使用sesson即可

save_session:请求走的时候,校验session有没有被改过了,如果改过了,删除cookie重新设置cookie

session用起来像字典,如何做,一个对象可以像字典一样使用,_getitem_,__setitem__只要调用_setitem__就说明了,对象属性modify,一开始false只要触发_setitem,置为true,后期只要判断modify,就可以判断session有没有被改过

闪现

跨请求获取到之前请求存放的数据,取一次就没了,关注django中的message框架

放值:flash('%s,我错了'%name) 取值:get_flashed_messages()

请求扩展

before_request 请求来的时候

after_request 请求走的使用

before_first_request 项目启动后第一个请求来触发

teardown_request:错误日志记录

errorhandler:某种状体码,就会执行,http的状态码1xx,2xx,3xx,4xx,5xx

内容详情 蓝图的使用

blueprint翻译过来的,称之为蓝图

作用是:之前全在一个py中写flask项目,后期肯定要划分目录

不用蓝图划分目录

no_blueprint_flask # 项目名 src #核心源码位置 __init__.py # 包 里面实例化得到了app对象, models.py #放表模型 views.py # 放视图函数 static # 放静态资源 templates # 放模板 home.html # 模板 manage.py # 启动文件 蓝图的使用步骤 第一步:导入蓝图类 from flask import Blueprint 第二步:实例化得到蓝图对象 us=Blueprint('user',__name__) 第三步:在app中注册蓝图 app.register_blueprint(us) 第四步:在不同的views.py使用蓝图注册路由 @us.route('/login') 补充:蓝图可以有自己的静态文件和模板 补充:注册蓝图时,可以使用前缀,必须以/开头 使用蓝图,划分小型目录 little_blueprint # 项目名 -src # 核心代码 -static # 静态文件 -1.jpg # 图片 -templates # 模板文件 -user.html # 模板 -views # 视图函数存放位置 -order.py # 订单相关视图 -user.py # 用户相关视图 -__init__.py # 包 -models.py # 表模型 -manage.py # 启动文件 使用蓝图,划分大型项目目录,多个app,像django一样 big_blueprint # 项目名 -src # 核心文件 -admin # admin的app -static # 静态文件 -1.jpg # 图片 -templates # 模板文件目录 -admin_home.html # 模板文件 -__init__.py # 包 -models.py # 表模型 -views.py # 视图函数 -home # home app -order # orderapp -__init__.py # 包 -settings.py # 配置文件 -manage.py # 启动文件 g对象

g 对象 是什么

global的缩写,再python中是个关键字,不能以关键字作为变量名,干脆用了g

g 对象,再整个请求的全局,可以放值,可以取值

全局变量,再任意位置导入使用即可

他为什么不学django使用request作为上下文

因为使用request,可能会造成request数据的污染,不小心改了request的属性,但我们可能不知道

建议使用g 是空的放入之后再当次请求中全局优先

以后想在当次请求中,放入一些数据,后面使用,就可以使用g对象

g和session有什么区别

g 是只针对于当次请求

session针对于多次请求

from flask import Flask, g, request app = Flask(__name__) app.debug = True @app.before_request def before(): if 'home' in request.path: g.xx = 'xx' def add(a, b): # print('---',g.name) print('---', request.name) return a + b @app.route('/') def index(): print(g.xx) name = request.args.get('name') # g.name = name requesthod = name res = add(1, 2) print(res) return 'index' @app.route('/home') def home(): print(g.xx) return 'index' if __name__ == '__main__': app.run() 数据库连接池

flask操作pymysql

使用pymysql

再视图函数中,创建pymysql的连接,查数据,查完,返回给前端

有什么问题?来一个请求,创建一个连接,请求结束连接关闭(django就是这么做的)

把连接对象,做成全局的,再视图函数中,使用全局的连接,查询,返回给前端

有什么问题?会出现数据错乱

image-20230404120516198

测试全局连接结果数据错乱问题

import pymysql import threading import time conn = pymysql.connect(user='root', password="root", host='127.0.0.1', database='luffy01', port=3306) cursor = conn.cursor(pymysql.cursors.DictCursor) def task_1(): cursor.execute('select * from luffy_user') time.sleep(3) res = cursor.fetchall() print(res, "这是user表的数据") def task_2(): time.sleep(1) cursor.execute('select * from luffy_banner') time.sleep(5) res = cursor.fetchall() print(res, '这是banner表的数据') if __name__ == '__main__': t1 = threading.Thread(target=task_1) t2 = threading.Thread(target=task_2) t1.start() t2.start()

image-20230404185228226

解决上面的两个问题

数据库连接池

创建一个全局的池

每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数

使用第三方数据库连接池,使用步骤

安装pip install dbutils

使用:实例化的到一个池对象

在视图函数中导入使用

from dbutils.pooled_db import PooledDB import pymysql pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=10, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='root', database='luffy01', charset='utf8' ) conn = pool.connection() cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select * from luffy_banner limit 2') res = cursor.fetchall() print(res)

带池的代码

@app.route('/article_pool') def article_pool(): conn = pool.connection() cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select id,title,author_img from aritcle limit 2') res = cursor.fetchall() print(res) return jsonify(res)

不带池的代码

@app.route('/article') def article(): conn = pymysql.connect(user='root', password="", host='127.0.0.1', database='cnblogs', port=3306) cursor = conn.cursor(pymysql.cursors.DictCursor) time.sleep(random.randint(1,3)) cursor.execute('select id,title,author_img from aritcle limit 2') res = cursor.fetchall() cursor.close() conn.close() return jsonify(res)

压力测试

from threading import Thread import requests def task(): res = requests.get('http://127.0.0.1:5000/article_pool') print(len(res.text)) if __name__ == '__main__': for i in range(500): t = Thread(target=task) t.start()

效果是池的连接数明显小

不使用池连接数明显大

查看数据连接数

show status like 'Threads%'


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3