0
点赞
收藏
分享

微信扫一扫

python连接mysql、hive、clickhouse数据库封装类(2023年2月)


写这篇笔记的背景:

现在的数据库种类有很多,有数据仓库hive类型、有mysql数据库类型、也有ClickHouse数据库类型,针对不同的数据库类型,python读取的方式和代码都不一样。所以本文以读取数据库操作为例子,将这些连接不同数据库的操作整合成类。便于代码的使用和扩展。代码可以直接复制过去,修改ip、账号、密码、端口后直接使用

文章目录

  • ​​工具库sqlalchemy简单介绍​​
  • ​​读取mysql数据的最小单元例子​​
  • ​​连接mysql的代码封装成一个类​​
  • ​​读取hive数据的最小单元例子,需要用到impala​​
  • ​​连接hive的代码封装成一个类​​
  • ​​读取ClickHouse数据库的最小单元例子​​
  • ​​连接ClickHouse的代码封装成一个类​​

工具库sqlalchemy简单介绍

需要用到的工具包是pandas和sqlalchemy。pandas是python的一种数据分析库。sqlalchemy是Python中最有名的ORM工具,特点是操纵Python对象而不是SQL查询,也就是在代码层面考虑的是对象,而不是SQL,体现的是一种程序化思维,这样使得Python程序更加简洁易读。

例如下面的​​User​​类表示的就是

包含​​id​​​和​​name​​​的​​user​​表

class User(object):
def __init__(self, id, name):
self.id = id
self.name = name
# 下面就是user表
[
User('1', 'Michael'),
User('2', 'Bob'),
User('3', 'Adam')
]

我们可以通过sqlalchemy连接mysql数据库,然后进行各种增删改查,这里只演示查询读取数据。

读取mysql数据的最小单元例子

  • 连接数据库,生成数据库引擎

# 各种信息需要按照格式这样子编排
connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(
self.user, self.password, self.host, self.port, self.database
)
# 创建一个连接引擎
engine = sqlalchemy.create_engine(connection_str)

  • 根据数据库引擎,创建连接

conn = engine.connect()

  • 根据连接读取数据

data = pd.read_sql(sql, conn)  # sql就是你自己的sql语句

  • 第四步,关闭连接,然后释放引擎

conn.close()  # 关闭连接
engine.dispose() # 释放引擎

连接mysql的代码封装成一个类

封装成一个类的好处就是你想要用,直接把这个类拿出来即可。

相关的说明在代码里面有详细的注释,里面的ip、账号、密码、等信息是虚假的。可直接复制过去使用。

import logging
import pandas as pd
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time


class MySqlHelper(object):
def __init__(
self,
host='192.168.15.124',
port=3306,
database='tert_ims',
user='spkjz_wyiter',
password='7cmoP3PDtueVJQj2q4Az',
logger:logging.Logger=None
):
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.logger = logger

self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(
self.user, self.password, self.host, self.port, self.database
)

self.conn = None
self.cursor = None
self.engine = None
self.session = None

def create_table_code(self, file_name):
'''创建表类代码'''
os.system(f'sqlacodegen {self.connection_str} > {file_name}')
return self.conn

def get_conn(self):
'''创建连接或获取连接'''
if self.conn is None:
engine = self.get_engine()
self.conn = engine.connect()
return self.conn

def get_engine(self):
'''创建连接或获取连接'''
if self.engine is None:
self.engine = sqlalchemy.create_engine(self.connection_str)
return self.engine

def get_cursor(self):
'''创建连接或获取连接'''
if self.cursor is None:
self.cursor = self.conn.cursor()
return self.cursor

def get_session(self) -> sessionmaker:
'''创建连接或获取连接'''
if self.session is None:
engine = self.get_engine()
Session = sessionmaker(bind=engine)
self.session = Session()
return self.session

def close_conn(self):
'''关闭连接'''
if self.conn is not None:
self.conn.close()
self.conn = None
self.dispose_engine()

def close_session(self):
'''关闭连接'''
if self.session is not None:
self.session.close()
self.session = None
self.dispose_engine()

def dispose_engine(self):
'''释放engine'''
if self.engine is not None:
# self.engine.dispose(close=False)
self.engine.dispose()
self.engine = None

def close_cursor(self):
'''关闭cursor'''
if self.cursor is not None:
self.cursor.close()
self.cursor = None

def get_data(self, sql, auto_close=True) -> pd.DataFrame:
'''查询数据'''
conn = self.get_conn()
data = None
try:
# 异常重试3次
for i in range(3):
try:
data = pd.read_sql(sql, conn)
break
except Exception as ex:
if i == 2:
raise ex # 往外抛出异常
time.sleep(60) # 一分钟后重试
except Exception as ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
if auto_close:
self.close_conn()
return data

读取hive数据的最小单元例子,需要用到impala

  • 连接数据库,生成数据库引擎

from impala.dbapi import connect
import sqlalchemy
impala_conn = connect(
host=self.host,
port=self.port,
database=self.database,
auth_mechanism=self.auth_mechanism,
user=self.user,
password=self.password
)
engine = sqlalchemy.create_engine('impala://', creator=get_impala_conn)

  • 根据数据库引擎,创建连接

conn = engine.connect()

  • 根据连接读取数据

data = pd.read_sql(sql, conn)  # sql就是你自己的sql语句

  • 第四步,关闭连接,然后释放引擎

conn.close()  # 关闭连接
engine.dispose() # 释放引擎

可以看到,使用sqlalchemy的话,对于hive数据库而言,跟mysql唯一不同的只是在第一步生成引擎的时候有差异,其它用法都没有差异

连接hive的代码封装成一个类

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time


class HiveHelper(object):
def __init__(
self,
host='10.2.32.12',
port=21051,
database='ur_ti_dw',
auth_mechanism='LDAP',
user='urberi',
password='Ur#730exd',
logger:logging.Logger=None
):
self.host = host
self.port = port
self.database = database
self.auth_mechanism = auth_mechanism
self.user = user
self.password = password
self.logger = logger

self.impala_conn = None
self.conn = None
self.cursor = None # 这里没有详细解释cursor的用法,感兴趣可以去sqlalchemy官网了解
self.engine = None
self.session = None # 这里没有详细解释session的用法,感兴趣可以去sqlalchemy官网了解

def create_table_code(self, file_name):
'''创建表类代码'''
os.system(f'sqlacodegen {self.connection_str} > {file_name}')
return self.conn

def get_conn(self):
'''创建连接或获取连接'''
if self.conn is None:
engine = self.get_engine()
self.conn = engine.connect()
return self.conn

def get_impala_conn(self):
'''创建连接或获取连接'''
if self.impala_conn is None:
self.impala_conn = connect(
host=self.host,
port=self.port,
database=self.database,
auth_mechanism=self.auth_mechanism,
user=self.user,
password=self.password
)
return self.impala_conn

def get_engine(self):
'''创建连接或获取连接'''
if self.engine is None:
self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
return self.engine

def get_cursor(self):
'''创建连接或获取连接'''
if self.cursor is None:
self.cursor = self.conn.cursor()
return self.cursor

def get_session(self) -> sessionmaker:
'''创建连接或获取连接'''
if self.session is None:
engine = self.get_engine()
Session = sessionmaker(bind=engine)
self.session = Session()
return self.session

def close_conn(self):
'''关闭连接'''
if self.conn is not None:
self.conn.close()
self.conn = None
self.dispose_engine()
self.close_impala_conn()

def close_impala_conn(self):
'''关闭impala连接'''
if self.impala_conn is not None:
self.impala_conn.close()
self.impala_conn = None

def close_session(self):
'''关闭连接'''
if self.session is not None:
self.session.close()
self.session = None
self.dispose_engine()

def dispose_engine(self):
'''释放engine'''
if self.engine is not None:
# self.engine.dispose(close=False)
self.engine.dispose()
self.engine = None

def close_cursor(self):
'''关闭cursor'''
if self.cursor is not None:
self.cursor.close()
self.cursor = None

def get_data(self, sql, auto_close=True) -> pd.DataFrame:
'''查询数据'''
conn = self.get_conn()
data = None
try:
# 异常重试3次
for i in range(3):
try:
data = pd.read_sql(sql, conn)
break
except Exception as ex:
if i == 2:
raise ex # 往外抛出异常
time.sleep(60) # 一分钟后重试
except Exception as ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
if auto_close:
self.close_conn()
return data

读取ClickHouse数据库的最小单元例子

  • 连接数据库,生成数据库引擎

import sqlalchemy
connection_str = 'clickhouse://%s:%s@%s:%d/%s?ssl=True' %(
self.user, self.password, self.host, self.port, self.database
)
engine = sqlalchemy.create_engine(self.connection_str)

  • 根据数据库引擎,创建连接

conn = engine.connect()

  • 根据连接读取数据

data = pd.read_sql(sql, conn)  # sql就是你自己的sql语句

  • 第四步,关闭连接,然后释放引擎

conn.close()  # 关闭连接
engine.dispose() # 释放引擎

显而易见,更上面的差别也只是在生成引擎部分而已。不严谨得推测,不管是什么类型数据库,用上面这个流程,根据官网的提示去生成对应的引擎即可。这就是sqlalchemy的优点之处,对所有数据库进行了整合。

连接ClickHouse的代码封装成一个类

同学们,阅读一遍代码很重要,代码都有注释

import logging
import pandas as pd
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time


class ClickhouseHelper(object):
def __init__(
self,
host='10.2.160.11',
port=8123,
database='urbidb',
user='admin',
password='UrClickhouse123admin',
logger:logging.Logger=None
):
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.logger = logger

self.connection_str = 'clickhouse://%s:%s@%s:%d/%s?ssl=True' %(
self.user, self.password, self.host, self.port, self.database
)

self.conn = None
self.cursor = None
self.engine = None
self.session = None

def create_table_code(self, file_name):
'''创建表类代码'''
os.system(f'sqlacodegen {self.connection_str} > {file_name}')
return self.conn

def get_conn(self):
'''创建连接或获取连接'''
if self.conn is None:
engine = self.get_engine()
self.conn = engine.connect()
return self.conn

def get_engine(self):
'''创建连接或获取连接'''
if self.engine is None:
self.engine = sqlalchemy.create_engine(self.connection_str)
return self.engine

def get_cursor(self):
'''创建连接或获取连接'''
if self.cursor is None:
self.cursor = self.conn.cursor()
return self.cursor

def get_session(self) -> sessionmaker:
'''创建连接或获取连接'''
if self.session is None:
engine = self.get_engine()
Session = sessionmaker(bind=engine)
self.session = Session()
return self.session

def close_conn(self):
'''关闭连接'''
if self.conn is not None:
self.conn.close()
self.conn = None
self.dispose_engine()

def close_session(self):
'''关闭连接'''
if self.session is not None:
self.session.close()
self.session = None
self.dispose_engine()

def dispose_engine(self):
'''释放engine'''
if self.engine is not None:
# self.engine.dispose(close=False)
self.engine.dispose()
self.engine = None

def close_cursor(self):
'''关闭cursor'''
if self.cursor is not None:
self.cursor.close()
self.cursor = None

def get_data(self, sql, auto_close=True) -> pd.DataFrame:
'''查询数据'''
conn = self.get_conn()
data = None
try:
# 异常重试3次
for i in range(3):
try:
data = pd.read_sql(sql, conn)
break
except Exception as ex:
if i == 2:
raise ex # 往外抛出异常
time.sleep(60) # 一分钟后重试
except Exception as ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
if auto_close:
self.close_conn()
return data

有了这个基础的连接类之后,你可以根据不同类的不同对象去实现数据库的所有增删改查。因为我是做人工智能算法的,所以写入操作和读取操作用得比较多,修改和更新操作用得不多。


举报

相关推荐

0 条评论