Flask+mysql 实现增删改查接口开发+测试(图文教程附源码)

Flask+mysql 实现增删改查接口开发+测试(图文教程附源码)

2023-08-21 06:28| 来源: 网络整理| 查看: 265




2.1 安装python

2.2 安装mysql 


3.1 安装pycharm

3.2 安装Navicat

3.3 安装postman


4.1 Navicat连接

4.2 数据库准备


5.1 表的增删改查操作 

5.2 增加接口服务

5.3 删除接口服务

5.4 修改接口服务

5.5 查询接口服务


 6.1 测试增加接口服务

 6.2 测试删除接口服务

 6.3 测试修改接口服务

 6.4 测试查询接口服务





       1.1 之前在练习了Django的博客项目,前后端不分离的,现在就想着把Flask框架也再熟悉一下,后续把接口返回的数据用vue显示。python 比较流行的框架是Django 和Flask,重要性不言而喻

       1.2 刚好是五一,发文符合劳动节精神,深圳又下雨,在家搬砖

二、环境准备 2.1 安装python


python -V

本文内容运行在python 3.6.1

2.2 安装mysql 

       确保你的电脑安装了mysql,并正确设置账号和密码,本机账号和密码为 root/root

mysql -V

本文内容mysql 5.7中运行通过

三、工具准备 3.1 安装pycharm

       确保你的电脑安装了pycharm,我这里用的是pycharm professional 2017.3(已pojie)

3.2 安装Navicat


3.3 安装postman


四、mysql数据库准备 4.1 Navicat连接


4.2 数据库准备





       开发前先做一些准备工作,新建flask项目,file ->new project


控制台会打印 访问地址,点击访问 浏览器页面显示 Hello World!,我这里代码删了不贴图了。



import pymysql from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask import make_response,request from flask_cors import CORS pymysql.install_as_MySQLdb()



app = Flask(__name__) # ------------------database---------------------------- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:root@localhost:3306/books' # 指定数据库文件 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True # 允许修改跟踪数据库 db = SQLAlchemy(app)


class Books(db.Model): id = db.Column(db.Integer, primary_key=True, comment='自动递增id,唯一键') title = db.Column(db.String(80), nullable=False, comment='书名') author = db.Column(db.String(120), nullable=False, comment='作者') read_status = db.Column(db.Boolean, comment='阅读状态,0未读,1已读')


db.create_all() # 创建表(表创建好后可注释掉)



5.1 表的增删改查操作 


# 增加数据 def insert_data(title, author, read_status): book = Books(title=title, author=author, read_status=read_status) db.session.add_all([book]) db.session.commit() # 查询所有 def select_data_all(): book_list = [] books = Books.query.all() # 类似于 select * from Books for s in books: dic = {} dic['id'] = s.id dic['title'] = s.title dic['author'] = s.author dic['read_status'] = s.read_status book_list.append(dic) return book_list # 通过id查询 def select_data_by_id(id): book = Books.query.get(id) if not book: return False dic = {} dic['id'] = book.id dic['title'] = book.title dic['author'] = book.author dic['read_status'] = book.read_status return dic # 通过id删除数据 def delete_data(id): # 类似于 select * from Books where id = id delete_id = Books.query.get(id) if not delete_id: return False db.session.delete(delete_id) db.session.commit() # 提交操作到数据库 # 修改数据 def update_data(id, title='', author='', read_status='', new_id=''): book = Books.query.get(id) if not title == '': book.title = title if not author == '': book.author = author if not read_status == '': book.read_status = read_status if not new_id == '': book.id = new_id db.session.commit()


# 解决浏览器浏览器访问输出乱码问题 app.config['JSON_AS_ASCII'] = False @app.after_request def after(resp): resp = make_response(resp) resp.headers['Access-Control-Allow-Origin'] = '*' # 允许跨域地址 resp.headers['Access-Control-Allow-Methods'] = '*' # 请求 ‘*’ 就是全部 resp.headers['Access-Control-Allow-Headers'] = 'x-requested-with,content-type' # 头部 resp.headers['Access-Control-Allow-Credentials'] = 'True' return resp CORS(app, resources=r'/*', supports_credentials=True) 5.2 增加接口服务




# 前端通过传参title、author、read_status增加书籍 @app.route('/add', methods=['POST']) def add(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用add方传过来的参数是', post_data) book_list = select_data_all() for i in range(len(book_list)): title_list = book_list[i]['title'] if post_data.get('title') in title_list: response_object['message'] = '书名(title)重复!' response_object["status"]= 'fail' return response_object if post_data.get('title') is None: response_object['message'] = 'title是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('author') is None: response_object['message'] = 'author是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('read_status') is None: response_object['message'] = 'read_status是必传参数!' response_object["status"]= 'fail' return response_object title = str(post_data.get('title')).strip(), author = str(post_data.get('author')).strip(), read_status = post_data.get('read_status') if title[0] is None or title[0] is '': response_object['message'] = 'title不能为空!' response_object["status"] = 'fail' return response_object if author[0] is None or author[0] is '': response_object['message'] = '作者不能为空!' response_object["status"] = 'fail' return response_object if read_status != 0 and read_status != 1: response_object['message'] = '阅读状态只能为0和1!' response_object["status"] = 'fail' return response_object insert_data(title=title[0], author=author[0], read_status=read_status) response_object['message'] = '图书添加成功!' return response_object


使用not in检查必需的参数是否存在,如果不存在则返回HTTP状态码400(Bad Request)。对于空字符串的判断,使用not title和not author,而不是与None进行比较。使用int(post_data['read_status'])将read_status转换为整数类型。为了避免重复的书名,提取已存在的书名列表,并使用列表推导式构建title_list。在返回错误响应时,包含了HTTP状态码400。 @app.route('/add', methods=['POST']) def add(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用add方传过来的参数是', post_data) if 'title' not in post_data: response_object['message'] = 'title是必传参数!' response_object['status'] = 'fail' return response_object, 400 if 'author' not in post_data: response_object['message'] = 'author是必传参数!' response_object['status'] = 'fail' return response_object, 400 if 'read_status' not in post_data: response_object['message'] = 'read_status是必传参数!' response_object['status'] = 'fail' return response_object, 400 title = post_data['title'].strip() author = post_data['author'].strip() read_status = int(post_data['read_status']) if not title: response_object['message'] = 'title不能为空!' response_object['status'] = 'fail' return response_object, 400 if not author: response_object['message'] = '作者不能为空!' response_object['status'] = 'fail' return response_object, 400 if read_status not in (0, 1): response_object['message'] = '阅读状态只能为0和1!' response_object['status'] = 'fail' return response_object, 400 book_list = select_data_all() title_list = [book['title'] for book in book_list] if title in title_list: response_object['message'] = '书名(title)重复!' response_object['status'] = 'fail' return response_object, 400 insert_data(title=title, author=author, read_status=read_status) response_object['message'] = '图书添加成功!' return response_object

5.3 删除接口服务


# 前端通过传id删除书籍 @app.route('/delete', methods=['DELETE']) def delete(): response_object = {'status': 'success'} if requesthod == 'DELETE': post_data = request.get_json() print('调用delete方传过来的参数是:', post_data) if post_data.get('id') is None: response_object['message'] = 'id是必传参数!' response_object["status"]= 'fail' return response_object id = post_data.get('id') result = delete_data(id) # 删除方法调用 if result is False: response_object['message'] = '需要删除的图书不存在!' response_object["status"] = 'fail' return response_object else: response_object['message'] = '图书被删除!' return response_object


不再使用== None进行空值判断,而是使用not in检查键是否存在于字典中。对于缺少必需的id参数,返回了HTTP状态码400(Bad Request)。对于找不到需要删除的图书,返回了HTTP状态码404(Not Found)。使用not result判断删除方法的结果,而不是使用== False。省略了不必要的response_object["status"] = 'success',因为默认状态就是成功。 @app.route('/delete', methods=['POST']) def delete(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用delete方传过来的参数是:', post_data) if 'id' not in post_data: response_object['message'] = 'id是必传参数!' response_object['status'] = 'fail' return response_object, 400 id = post_data['id'] result = delete_data(id) # 删除方法调用 if not result:# 这行代码有bug response_object['message'] = '需要删除的图书不存在!' response_object['status'] = 'fail' return response_object, 404 response_object['message'] = '图书被删除!' return response_object


if not result:

会导致的效果是调用删除接口,但是实际上返回   需要删除的图书不存在!


if result is False:




5.4 修改接口服务


# 前端通过传参title、author、read_status修改书籍 @app.route('/update', methods=['POST']) def update(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用update方传过来的参数是', post_data) if post_data.get('id') is None: response_object['message'] = 'id是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('title') is None: response_object['message'] = 'title是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('author') is None: response_object['message'] = 'author是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('read_status') is None: response_object['message'] = 'read_status是必传参数!' response_object["status"]= 'fail' return response_object # 查询所有数据 book_list = select_data_all() # 拼接所有的id到列表 for i in range(len(book_list)): id_list = book_list[i]['id'] # 判断书籍id在不在列表内 if post_data.get('id') is not id_list and int(post_data.get('id')) is not id_list: response_object['message'] = '需要修改的书籍id不存在!' response_object["status"]= 'fail' return response_object title = str(post_data.get('title')).strip(), author = str(post_data.get('author')).strip(), read_status = post_data.get('read_status') if title[0] is None or title[0] is '': response_object['message'] = 'title不能为空!' response_object["status"] = 'fail' return response_object if author[0] is None or author[0] is '': response_object['message'] = '作者不能为空!' response_object["status"] = 'fail' return response_object if read_status != 0 and read_status != 1: response_object['message'] = '阅读状态只能为0和1!' response_object["status"] = 'fail' return response_object books_id = post_data.get('id') title = post_data.get('title') author = post_data.get('author') read_status = post_data.get('read_status') update_data(id=books_id, title=title, author=author, read_status=read_status) response_object['message'] = '图书已更新!' return response_object


使用 required_fields 列表来存储必传参数,遍历检查是否存在。使用列表推导式生成 id_list,避免在循环中重复拼接。使用 not title 和 not author 的形式来检查是否为空,更简洁。使用列表形式 [0, 1] 来检查阅读状态,避免使用多个不等式判断。删除不必要的 response_object 变量。不需要显式地指定 HTTP 响应状态码,Flask 会根据返回的 JSON 对象自动设置状态码 from flask import json @app.route('/update', methods=['POST']) def update(): if requesthod == 'POST': post_data = request.get_json() print('调用update方传过来的参数是', post_data) required_fields = ['id', 'title', 'author', 'read_status'] for field in required_fields: if field not in post_data: return json.dumps({ 'status': 'fail', 'message': f'{field}是必传参数!' }), 400 book_list = select_data_all() id_list = [book['id'] for book in book_list] if post_data['id'] not in id_list: return json.dumps({ 'status': 'fail', 'message': '需要修改的书籍id不存在!' }), 404 title = str(post_data['title']).strip() author = str(post_data['author']).strip() #read_status = post_data['read_status']#这里需要强制转换不然前端代码报错 read_status = int(post_data['read_status']) if not title: return json.dumps({ 'status': 'fail', 'message': 'title不能为空!' }), 400 if not author: return json.dumps({ 'status': 'fail', 'message': '作者不能为空!' }), 400 if read_status not in [0, 1]: return json.dumps({ 'status': 'fail', 'message': '阅读状态只能为0和1!' }), 400 books_id = post_data['id'] update_data(id=books_id, title=title, author=author, read_status=read_status) return json.dumps({ 'status': 'success', 'message': '图书已更新!' }) 5.5 查询接口服务



# 前端通过不传参默认查询所有书籍,传id查询对应书籍 @app.route('/query', methods=['POST']) def query(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用query方传过来的参数是', post_data) if post_data.get('id') is None: books = select_data_all() response_object['message'] = '查询所有书籍成功!' response_object['books'] = books return response_object id = str(post_data.get('id')).strip() if id is None or id is '': response_object['message'] = 'id不能为空!' response_object["status"] = 'fail' return response_object book = select_data_by_id(id) if book is False: response_object['message'] = '需要查询的图书不存在!' response_object["status"] = 'fail' return response_object else: response_object['message'] = '查询书籍成功!' response_object['books'] = book return response_object


移除了冗余的条件判断和注释。使用is None进行空值检查,而不是与None进行比较。将空字符串的判断条件改为id == '' or id is None,使代码更加简洁。在最后直接返回response_object,避免重复的return语句。

@app.route('/query', methods=['POST']) def query(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用query方传过来的参数是', post_data) id = str(post_data.get('id')).strip() if id == '' or id is None: books = select_data_all() response_object['message'] = '查询所有图书成功!' response_object['data'] = books return response_object book = select_data_by_id(id) if book is False: response_object['message'] = '需要查询的图书不存在!' response_object["status"] = 'fail' else: response_object['message'] = '图书查询成功!' response_object['data'] = book return response_object






insert_data("《水浒传》", "吴承恩", 1)


 6.1 测试增加接口服务



{ "author": "保尔柯察金", "read_status":0, "title": "《钢铁是怎么炼成的》" }



在这个基础上再次 send



 6.2 测试删除接口服务



{ "id":60 }


在这个基础上再次 send



 6.3 测试修改接口服务




{ "id":4, "author": "罗贯中1", "read_status":0, "title": "《三国演义1》" }




 6.4 测试查询接口服务



{ }



{ "books": [ { "author": "施耐庵", "id": 1, "read_status": true, "title": "《西游记》" }, { "author": "曹雪芹", "id": 2, "read_status": false, "title": "《红楼梦》" }, { "author": "吴承恩", "id": 3, "read_status": true, "title": "《水浒传》" }, { "author": "罗贯中1", "id": 4, "read_status": false, "title": "《三国演义1》" } ], "message": "查询所有书籍成功!", "status": "success" }




{ "id":4 }

结果 ,查询成功


其他的测试场景这里也不列了,大家可以试下 ,比如传不存在的图书id





九、附录(源代码) # -*- coding: utf-8 -*- # @Author : Liqiju # @Time : 2022/5/1 2:45 # @File : app.py # @Software: PyCharm import pymysql from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask import make_response,request from flask_cors import CORS pymysql.install_as_MySQLdb() app = Flask(__name__) # ------------------database---------------------------- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:root@localhost:3306/books' # 指定数据库文件 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True # 允许修改跟踪数据库 db = SQLAlchemy(app) class Books(db.Model): id = db.Column(db.Integer, primary_key=True, comment='自动递增id,唯一键') title = db.Column(db.String(80), nullable=False, comment='书名') author = db.Column(db.String(120), nullable=False, comment='作者') read_status = db.Column(db.Boolean, comment='阅读状态,0未读,1已读') # 增加数据 def insert_data(title, author, read_status): book = Books(title=title, author=author, read_status=read_status) db.session.add_all([book]) db.session.commit() # 查询所有 def select_data_all(): book_list = [] books = Books.query.all() # 类似于 select * from Books for s in books: dic = {} dic['id'] = s.id dic['title'] = s.title dic['author'] = s.author dic['read_status'] = s.read_status book_list.append(dic) return book_list # 通过id查询 def select_data_by_id(id): book = Books.query.get(id) if not book: return False dic = {} dic['id'] = book.id dic['title'] = book.title dic['author'] = book.author dic['read_status'] = book.read_status return dic # 通过id删除数据 def delete_data(id): # 类似于 select * from Books where id = id delete_id = Books.query.get(id) if not delete_id: return False db.session.delete(delete_id) db.session.commit() # 提交操作到数据库 # 修改数据 def update_data(id, title='', author='', read_status='', new_id=''): book = Books.query.get(id) if not title == '': book.title = title if not author == '': book.author = author if not read_status == '': book.read_status = read_status if not new_id == '': book.id = new_id db.session.commit() # 解决浏览器浏览器访问输出乱码问题 app.config['JSON_AS_ASCII'] = False @app.after_request def after(resp): resp = make_response(resp) resp.headers['Access-Control-Allow-Origin'] = '*' # 允许跨域地址 resp.headers['Access-Control-Allow-Methods'] = '*' # 请求 ‘*’ 就是全部 resp.headers['Access-Control-Allow-Headers'] = 'x-requested-with,content-type' # 头部 resp.headers['Access-Control-Allow-Credentials'] = 'True' return resp CORS(app, resources=r'/*', supports_credentials=True) # 前端通过传参title、author、read_status增加书籍 @app.route('/add', methods=['POST']) def add(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用add方传过来的参数是', post_data) book_list = select_data_all() for i in range(len(book_list)): title_list = book_list[i]['title'] if post_data.get('title') in title_list: response_object['message'] = '书名(title)重复!' response_object["status"]= 'fail' return response_object if post_data.get('title') is None: response_object['message'] = 'title是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('author') is None: response_object['message'] = 'author是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('read_status') is None: response_object['message'] = 'read_status是必传参数!' response_object["status"]= 'fail' return response_object title = str(post_data.get('title')).strip(), author = str(post_data.get('author')).strip(), read_status = post_data.get('read_status') if title[0] is None or title[0] is '': response_object['message'] = 'title不能为空!' response_object["status"] = 'fail' return response_object if author[0] is None or author[0] is '': response_object['message'] = '作者不能为空!' response_object["status"] = 'fail' return response_object if read_status != 0 and read_status != 1: response_object['message'] = '阅读状态只能为0和1!' response_object["status"] = 'fail' return response_object insert_data(title=title[0], author=author[0], read_status=read_status) response_object['message'] = '图书添加成功!' return response_object # 前端通过传id删除书籍 @app.route('/delete', methods=['DELETE']) def delete(): response_object = {'status': 'success'} if requesthod == 'DELETE': post_data = request.get_json() print('调用delete方传过来的参数是:', post_data) if post_data.get('id') is None: response_object['message'] = 'id是必传参数!' response_object["status"]= 'fail' return response_object id = post_data.get('id') result = delete_data(id) # 删除方法调用 if result is False: response_object['message'] = '需要删除的图书不存在!' response_object["status"] = 'fail' return response_object else: response_object['message'] = '图书被删除!' return response_object # 前端通过传参title、author、read_status修改书籍 @app.route('/update', methods=['POST']) def update(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用update方传过来的参数是', post_data) if post_data.get('id') is None: response_object['message'] = 'id是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('title') is None: response_object['message'] = 'title是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('author') is None: response_object['message'] = 'author是必传参数!' response_object["status"]= 'fail' return response_object if post_data.get('read_status') is None: response_object['message'] = 'read_status是必传参数!' response_object["status"]= 'fail' return response_object # 查询所有数据 book_list = select_data_all() # 拼接所有的id到列表 for i in range(len(book_list)): id_list = book_list[i]['id'] # 判断书籍id在不在列表内 if post_data.get('id') is not id_list and int(post_data.get('id')) is not id_list: response_object['message'] = '需要修改的书籍id不存在!' response_object["status"]= 'fail' return response_object title = str(post_data.get('title')).strip(), author = str(post_data.get('author')).strip(), read_status = post_data.get('read_status') if title[0] is None or title[0] is '': response_object['message'] = 'title不能为空!' response_object["status"] = 'fail' return response_object if author[0] is None or author[0] is '': response_object['message'] = '作者不能为空!' response_object["status"] = 'fail' return response_object if read_status != 0 and read_status != 1: response_object['message'] = '阅读状态只能为0和1!' response_object["status"] = 'fail' return response_object books_id = post_data.get('id') title = post_data.get('title') author = post_data.get('author') read_status = post_data.get('read_status') update_data(id=books_id, title=title, author=author, read_status=read_status) response_object['message'] = '图书已更新!' return response_object # 前端通过不传参默认查询所有书籍,传id查询对应书籍 @app.route('/query', methods=['POST']) def query(): response_object = {'status': 'success'} if requesthod == 'POST': post_data = request.get_json() print('调用query方传过来的参数是', post_data) if post_data.get('id') is None: books = select_data_all() response_object['message'] = '查询所有书籍成功!' response_object['books'] = books return response_object id = str(post_data.get('id')).strip() if id is None or id is '': response_object['message'] = 'id不能为空!' response_object["status"] = 'fail' return response_object book = select_data_by_id(id) if book is False: response_object['message'] = '需要查询的图书不存在!' response_object["status"] = 'fail' return response_object else: response_object['message'] = '查询书籍成功!' response_object['books'] = book return response_object if __name__ == '__main__': # 默认是5000,这里设置5001避免本地冲突。打开debug方便调试 # db.create_all() # 创建表(表创建好后可注释掉) # insert_data("《水浒传》", "吴承恩", 1) # 利用这个可以添加数据或者直接数据库手动加入 # 注意这个时候没开启校验title唯一性是因为不是前端过来的请求 app.run(debug=True, port=5001)

刚才试了在python 3.9版本中,“is” 要改为 “== ”号,不然报错 





