使用python将数据导入mysql的三种方法

您所在的位置:网站首页 python导入代码库 使用python将数据导入mysql的三种方法

使用python将数据导入mysql的三种方法

2023-08-04 02:19| 来源: 网络整理| 查看: 265

        最近经常要将数据导入到mysql中,我写过一次后也是复制粘贴前面写过的,但老是经常忘记写过的放哪去了,索性整理下直接写到博客里面来

方法:

      1、使用 pymysql 库, 数据一条条插入,或者用Django ORM里面的方法,数据批量插入

      2、使用 pandas 库,一次性插入,也可批量插入

      3、使用 pyspark, 一次性插入(可以不用建表,但是表没有注释, 即 mysql 的 COMMENT,要注释的话可以建空表)

方法1:

mysql 首先创建表

CREATE TABLE `data_to_mysql` (   `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '主键',   `name` VARCHAR(30) DEFAULT NULL COMMENT '姓名',   `sex` VARCHAR(20) DEFAULT NULL COMMENT '性别',   `age` int(5) DEFAULT NULL COMMENT '年龄',   PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='数据导入mysql';

 

import pymysql import pandas as pd mysql_host = 'localhost' mysql_db = 'test' mysql_user = 'root' mysql_pwd = '123456' mysql_table = 'data_to_mysql' class MYSQL: def __init__(self): # MySQL self.MYSQL_HOST = mysql_host self.MYSQL_DB = mysql_db self.MYSQ_USER = mysql_user self.MYSQL_PWD = mysql_pwd self.connect = pymysql.connect( host=self.MYSQL_HOST, db=self.MYSQL_DB, port=3306, user=self.MYSQ_USER, passwd=self.MYSQL_PWD, charset='utf8', use_unicode=False ) print(self.connect) self.cursor = self.connect.cursor() def insert_mysql(self, data_json): """ 数据插入mysql :param data_json: :return: """ sql = "insert into {}(`name`, `sex`, `age`) VALUES (%s, %s, %s)".format(mysql_table) try: self.cursor.execute(sql, (data_json['name'], data_json['sex'], data_json['age'])) self.connect.commit() print('数据插入成功') except Exception as e: print('e= ', e) print('数据插入错误') def main(): mysql = MYSQL() df = pd.DataFrame({ 'name': ['戴沐白','奥斯卡','唐三','小舞','马红俊','宁荣荣','朱竹清'], 'sex': ['男', '男', '男', '女', '男', '女', '女'], 'age': [23, 22, 21, 100000, 20, 20 ,20] }) # orient='records', 表示将DataFrame的数据转换成我想要的json格式 data_json = df.to_dict(orient='records') for dt in data_json: mysql.insert_mysql(dt) if __name__ == '__main__': main()

 

方法2

mysql 首先创建表

CREATE TABLE `data_to_mysql` (   `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '主键',   `name` VARCHAR(30) DEFAULT NULL COMMENT '姓名',   `sex` VARCHAR(20) DEFAULT NULL COMMENT '性别',   `age` int(5) DEFAULT NULL COMMENT '年龄',   PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='数据导入mysql';

import pandas as pd from sqlalchemy import create_engine mysql_host = 'localhost' mysql_db = 'test' mysql_user = 'root' mysql_pwd = '123456' mysql_table = 'data_to_mysql' def main(): engine = create_engine('mysql+pymysql://{}:{}@{}:3306/{}?charset=utf8'.format(mysql_user, mysql_pwd, mysql_host, mysql_db)) df = pd.DataFrame({ 'name': ['戴沐白','奥斯卡','唐三','小舞','马红俊','宁荣荣','朱竹清'], 'sex': ['男', '男', '男', '女', '男', '女', '女'], 'age': [23, 22, 21, 100000, 20, 20 ,20] }) # 表名 df.to_sql(mysql_table, con=engine, if_exists='append', index=False) """ to_sql参数:(比较重要) if_exists:表如果存在怎么处理 append:追加 replace:删除原表,建立新表再添加 fail:什么都不干 chunksize: 默认的话是一次性导入, 给值的话是批量导入,一批次导入多少 index=False:不插入索引index dtype 创建表结构 需要导入 import sqlalchemy dtype = {'id': sqlalchemy.types.BigInteger(), 'name': sqlalchemy.types.String(length=20), 'sex': sqlalchemy.types.String(length=20), 'age': sqlalchemy.types.BigInteger(), }) """ if __name__ == '__main__': main()

 

方法3

spark不能直接读Excel等数据,可以先将数据写入到json文件中

import pandas as pd import json def data_to_json(): """ 数据写入json :return: """ df = pd.DataFrame({ 'name': ['戴沐白', '奥斯卡', '唐三', '小舞', '马红俊', '宁荣荣', '朱竹清'], 'sex': ['男', '男', '男', '女', '男', '女', '女'], 'age': [23, 22, 21, 100000, 20, 20, 20] }) data_json = df.to_dict(orient='records') for dt in data_json: dt_string = json.dumps(dt, ensure_ascii=False) with open('data.json', 'a+', encoding='utf-8') as f: f.write('{}\n'.format(dt_string)) if __name__ == '__main__': data_to_json()

 

from pyspark import SQLContext from pyspark.sql import SparkSession import pyspark.sql.functions as F mysql_host = 'localhost' mysql_db = 'test' mysql_user = 'root' mysql_pwd = '123456' mysql_table = 'data_to_mysql' def sparkConfig(): """ spark的配置 :return: """ spark = SparkSession\ .builder\ .appName('data_to_mysql')\ .master('local[*]') \ .getOrCreate() sql = SQLContext(spark) return sql, spark def readJson(spark): data = spark.read.json('data.json') data.show() data.printSchema() print(dir(F)) # 两种方法都可以更改列名 # data.select(F.col('old_field1').alias('new_field1'), F.col('new_field2').alias('new_field2')).show() # data = data.withColumnRenamed('old_field', 'new_field') data.show() url = 'jdbc:mysql://{}:3306/{}?useUnicode=true&characterEncoding=utf8'.format(mysql_host, mysql_db) # 有中文,需处理中文乱码 table = mysql_table auth_mysql = {"user": mysql_user, "password": mysql_pwd} data.write.jdbc(url, table, mode='append', properties=auth_mysql) """ * `append`: 数据追加 * `overwrite`: 如果数据存在则覆盖 * `error`: 如果数据已经存在,则抛出异常。 * `ignore`: 如果数据已经存在,则忽略此操作。 """ def main(): sql, spark = sparkConfig() readJson(spark) if __name__ == '__main__': main()

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3