python 连接Mysql和hive

您所在的位置:网站首页 Python写数据到hive python 连接Mysql和hive

python 连接Mysql和hive

2024-07-16 16:50| 来源: 网络整理| 查看: 265

python 连接Mysql和hive

1、python 连接Mysql

import pymysql import pandas as pd # 连接database conn = pymysql.connect( host=“你的数据库地址”, port=3306, user=“用户名”,password=“密码”, database=“数据库名”, charset=“utf8”) # 得到一个可以执行SQL语句的光标对象 cursor = conn.cursor() # 执行完毕返回的结果集默认以元组显示 # 得到一个可以执行SQL语句并且将结果作为字典返回的游标 #cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 定义要执行的SQL语句 sql = """ show tables """ # 执行SQL语句 cursor.execute(sql) # 获取数据方式一: all_line = cursor.fetchall() # 获取数据方式二: result_df = pandas.read_sql(sql, con, index_col=None, coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None) # 关闭光标对象 cursor.close() # 关闭数据库连接 conn.close()

常用的参数是sql:SQL命令或者表名字,con:连接数据库的引擎,可以用SQLAlchemy或者pymysql建立,从数据库读数据的基本用法给出sql和con就可以了。其它都是默认参数,有特殊需求才会用到,有兴趣的话可以查看文档。

2、python 写入mysql

# engine = create_engine('mysql+pymysql://user:password@hosts:port/database?charset=utf8') # 查询语句,选出cp_settlepay_report表中的所有数据 # sql = 'select * from db_finance.cp_settlepay_report' # # read_sql_query的两个参数: sql语句, 数据库连接 # df = pd.read_sql_query(sql, engine) # 将新建的DataFrame储存为MySQL中的数据表,不储存index列(index=False) # if_exists: # 1.fail:如果表存在,啥也不做 # 2.replace:如果表存在,删了表,再建立一个新表,把数据插入 # 3.append:如果表存在,把数据插入,如果表不存在创建一个表!! # pd.io.sql.to_sql(df, 'example', con=engine, index=False, if_exists='replace') # df.to_sql('example', con=engine, if_exists='replace')这种形式也可以 import pandas as pd from sqlalchemy import create_engine engine = create_engine('mysql+pymysql://xiaowu:[email protected]:3306/db_finance?charset=utf8') try: # 执行sql pd.io.sql.to_sql(df,'cp_settlepay_report', engine, schema='db_finance', if_exists='append',index = False) except Exception as e: print(e) #有异常就回滚 engine.rollback() # 关闭链接 cursor.close() conn.close()

3、python 连接hive

Win7平台Python3使用安装impyla (1) pip install pure-sasl

(2) pip install thrift_sasl==0.2.1 --no-deps

(3) pip install thrift==0.9.3

(4) pip install impyla

执行数据库连接后,出现问题

ThriftParserError: ThriftPy does not support generating module with path in protocol ‘c’

定位到 \Lib\site-packages\thriftpy\parser\parser.py的

if url_scheme == '': with open(path) as fh: data = fh.read() elif url_scheme in ('http', 'https'): data = urlopen(path).read() else: raise ThriftParserError('ThriftPy does not support generating module ' 'with path in protocol \'{}\''.format( url_scheme))

更改为

if url_scheme == '': with open(path) as fh: data = fh.read() elif url_scheme in ('c', 'd','e','f''): with open(path) as fh: data = fh.read() elif url_scheme in ('http', 'https'): data = urlopen(path).read() else: raise ThriftParserError('ThriftPy does not support generating module ' 'with path in protocol \'{}\''.format( url_scheme))

执行数据库连接后,再次出现问题

TypeError: can’t concat str to bytes

定位到错误的最后一条,在\lib\site-packages\thrift_sasl_init_.py第94行

... header = struct.pack(">BI", status, len(body)) self._trans.write(header + body) ...

改为

... header = struct.pack(">BI", status, len(body)) if(type(body) is str): body = body.encode() self._trans.write(header + body) ...

在这里插入图片描述 连接hive

from impala.dbapi import connect from impala.util import as_pandas conn = connect(host='***', port=10000, auth_mechanism='PLAIN', user='***', password='***', database='***') cursor = conn.cursor() cursor.execute('show databases') result_df = as_pandas(cursor) # all_line = cursor.fetchall()

4、python 写入hive 关键流程主要分为两步:

(1)将pandas dataframe转换为sparkdataframe:这一步骤主要使用spark自带的接口:

spark_df = spark.createDataFrame(pd_df)

(2)将spark_df写入到hive的几种方式

spark_df.write.mode('overwrite').format("hive").saveAsTable("dbname.tablename")

完整代码:

import pandas as pd import numpy as np from pyspark import SparkContext,SparkConf from pyspark.sql import HiveContext,SparkSession from pyspark.sql import SQLContext pd_df = pd.DataFrame(np.random.randint(0,10,(3,4)),columns=['a','b','c']) spark = SparkSession.builder.appName('pd_2_hive').master('local').enableHiveSupport().getOrCreate() spark_df = spark.createDataFrame(pd_df) #spark dataframe 有接口可以直接写入到hive spark_df.write.mode('overwrite').format("hive").saveAsTable("dbname.tablename") ''' 其中 overwrite 代表如果表中存在数据,那么新数据会将原来的数据覆盖,此外还有append等模式,详细介绍如下: * `append`: Append contents of this :class:`DataFrame` to existing data. * `overwrite`: Overwrite existing data. * `error` or `errorifexists`: Throw an exception if data already exists. * `ignore`: Silently ignore this operation if data already exists. ''' #此外还可以将spark_df 注册为临时表,之后通过sql的方式写到hive里 spark_df.registerTempTable('tmp_table') tmp_sql = '''create table dbname.tablename as select * from tmp_table''' spark.sql(tmp_sql) spark.stop()


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3