0
点赞
收藏
分享

微信扫一扫

Python数据库篇:sqlite3、mysql、sqlalchemy


一:sqlite3

import sqlite3
conn = sqlite3.connect("test.db")
cursor = conn.cursor()
cursor.execute("create table user (id varchar(20) primary key, name varchar(20))")
cursor.execute("insert into user (id, name) values (\'1\', \'Michael\')")
print(cursor.rowcount)
conn.commit()


cursor.execute("select * from user where id=?", ("1",))
rows = cursor.fetchall()
print(rows)

cursor.close()
conn.close()

二:MySQL

pip install pymysql

import pymysql
from pymysql.cursors import DictCursor
# 1.创建连接
# cursorclass: 全局定义,表示查询时是返回的数据类型,字典还是元组(默认)
conn = pymysql.connect(host='127.0.0.1',
                       port=3306,
                       user='root',
                       password='123456',
                       database='test',
                       charset='utf8',
                       autocommit=True,
                       cursorclass=DictCursor)

# 2.获取游标,局部定义cursor
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

# 3.定义sql,最好使用"""来定义字符串,这样关键字会变色
create_table_sql = """
CREATE TABLE IF NOT EXISTS sys_user2 (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(15) NOT NULL COMMENT '用户名',
  `gender` tinyint(3) unsigned DEFAULT '0' COMMENT '性别(0: 女 1:男)',
  `amount` decimal(10,2) DEFAULT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '注册时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk` (`username`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
"""
# 4.执行
cursor.execute(create_table_sql)

delete_sql = """
delete from sys_user2
"""
cursor.execute(delete_sql)

insert_sql = """
insert into sys_user2(username, gender, amount, create_time) values(%s, %s, %s, now())
"""
# 插入1条
cursor.execute(insert_sql, ['monday', 0, 9999999])
# 插入多条
cursor.executemany(insert_sql, [('test', 0, 9999999), ('modely', 0, 66666666)])

update_sql = """
update sys_user2 set amount = %s where username = %s
"""
cursor.execute(update_sql, ['88888888', 'modely'])

# 注意:先查询再获取数据,不像其它语言一样直接返回结果
select_sql = """
select * from sys_user2
"""
cursor.execute(select_sql)
fetchone = cursor.fetchone()
fetchmany = cursor.fetchmany(2)
# 注意:总共有三条,上面游标已经走到最后了,所以下面游标再走就没有数据了
fetchall = cursor.fetchall()

# insert, update, delete 操作都需要进行提交,如果配置了autocommit=True就不需要显式提交了会自动提交
# conn.commit()
# conn.rollback()

# 5.关闭游标
cursor.close()

# 关闭连接
conn.close()

三:ORM sqlalchemy

3.1 简介

在Python中最著名的ORM(Object Relationship Mapping)对象关系映射)框架是SQLAlchemy,类似于Java中的Hibernate, 在Java中Hibernate已经被淘汰多年了,原因是Hibernate属于重量级框架SQL是框架自动生成的不能手动写SQL来优化SQL语句。在Java中一般都使用MyBatis,自己写sql语句,然后映射到对象上。

SQLAlchemy只是一种ORM框架,它并不能直接操作数据库,直接操作数据库还需要通过pymysql模块来操作。

ORM最重要的映射有两个:一是表名和实体类的映射;另一个是表的字段和类的属性之间的映射;

ORM基类:
获取ORM基类通过 Base = declarative_base()
获取,所有实体类都要继承Base类。

类与表的映射通过__tablename__属性来指定,如 __tablename__="user"

属性与字段的映射通过Column类来实现。可以指定列的数据类型、是否允许为空、默认值、是否为主键、是否自增、是否唯一、 注释等

sqlalchemy针对MySQL方言提供了专门的数据类型sqlalchemy.dialects.mysql, 如果使用的数据库是MySQL建议使用这套数据类型,这套数据类型和MySQL的数据类型一一对应。

3.2 语法

1. query()函数中参数是要查询的字段,如果要查询所有字段只需要将实体类型作为参数,如果要查询多个字段就通过

# select * from user
session.query(User).all()

# select id, username from user
session.query(User.id, User.username).all()

2. 关联关系:relationship('引用的类名',backref='反向关联的属性')

# 一篇文章的作者对应一个用户
author = relationship('User',backref='articles')

# 一个作者有多篇文章
articles = relationship("Article")

3. 删除修改都行将对象先查询出来,然后再操作,这样就是执行2次,还不如直接写sql执行1次。
4. 实际开发中表一般是不设置外键的,使用ORM就必须设置外键了。

3.3 示例

1. 安装依赖

pip install pymysql
pip install sqlalchemy

2. 建表和初始化数据

#!/usr/bin/env python 
# -*- coding:utf-8 -*-
author = 'suncity'

import sqlalchemy

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table, Column, types, Enum, ForeignKey, and_, or_
from sqlalchemy.dialects.mysql import VARCHAR, TEXT, BIGINT, INTEGER, SMALLINT, TINYINT, DECIMAL, FLOAT, DOUBLE, DATETIME, \
    TIMESTAMP
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.sql import func

from enum import Enum, unique
from random import randint

# 初始化数据库连接
HOST = '127.0.0.1'
PORT = 3306
USERNAME = 'root'
PASSWORD = 'root123'
DATABASE = 'test'
DATASOURCE_URL = "mysql+pymysql://{username}:{password}@{host}:{port}/" \
                 "{db}?charset=utf8".format(username=USERNAME, password=PASSWORD, host=HOST, port=PORT, db=DATABASE)
engine = create_engine(DATASOURCE_URL, encoding='utf-8', echo=True)
# 判断有没有链接成功
conn = engine.connect()
result = conn.execute("select 1").fetchone()
print(result)

# ORM基类
Base = declarative_base()

# 多对多配置
user_teacher = Table("user_teacher",
                     Base.metadata,
                     Column("user_id", BIGINT(unsigned=True), ForeignKey("user.id"), primary_key=True),
                     Column("teacher_id", BIGINT(unsigned=True), ForeignKey("teacher.id"), primary_key=True))


@unique
class StatusEnum(Enum):
    CLOSE = 0
    OPEN = 1


class User(Base):
    tablename = "user"

    id = Column(BIGINT(unsigned=True), primary_key=True, autoincrement=True)
    username = Column(VARCHAR(15), unique=True, nullable=False, comment="用户名")
    # 数字类型的默认值需要写成字符串
    gender = Column(TINYINT(unsigned=True), server_default="0", comment="性别(0: 女 1:男)")
    age = Column(TINYINT(unsigned=True), server_default="0", comment="年龄")
    # name用于指定数据库中的字段名,如不指定和属性名保持一致
    # 数据库命名规范一般是小写,每个单词用下划线分隔,如果Python属性也采用同样的命名规则就不需要显式指定列明。
    # 只有当属性名和列明不一样时才显式指定
    price = Column(DECIMAL(10, 2), name="amount", nullable=True)

    # 枚举实际中不建议使用(只有高版本的MySQL才支持enum类型),这里只是演示一下,实际情况下一般使用tinyint
    status = Column(types.Enum(StatusEnum))

    # DateTime类型的默认值使用func.now()
    create_time = Column(DATETIME, server_default=func.now(), comment="注册时间")
    # onupdate 当更新数据时会自动修改值
    update_time = Column(TIMESTAMP, onupdate=func.now())

    # 正向一对一(关闭一对多就是一对一了)
    # cascade=save-update默认值,当添加和更新的时候相关联的数据也会级联更新
    detail = relationship("UserDetail", uselist=False, cascade="save-update,delete")
    # 一对多
    tags = relationship("Tag")
    # 多对多, secondary用于指定中间表
    teachers = relationship("Teacher", secondary=user_teacher)

    def __init__(self, username, gender):
        self.username = username
        self.gender = gender

    def __str__(self):
        return ",\n".join([str(item) for item in self.__dict__.items()])


class UserDetail(Base):
    tablename = "user_detail"

    id = Column(BIGINT(unsigned=True), primary_key=True, autoincrement=True)
    city = Column(VARCHAR(15), nullable=False, comment="地址")
    description = Column(TEXT, nullable=True, comment="介绍")
    user_id = Column(BIGINT(unsigned=True), ForeignKey("user.id"))

    # 反向一对一
    user = relationship("User", uselist=False)


    def __init__(self, city, description, user_id = None):
        self.city = city
        self.description = description
        self.user_id = user_id


class Tag(Base):
    tablename = "tag"

    id = Column(BIGINT(unsigned=True), primary_key=True, autoincrement=True)
    tag = Column(VARCHAR(15), nullable=False, comment="标签")
    user_id = Column(BIGINT(unsigned=True), ForeignKey("user.id"))

    def __init__(self, tag, user_id):
        self.tag = tag
        self.user_id = user_id

    def __str__(self):
        return ",\n".join([str(item) for item in self.__dict__.items()])


class Teacher(Base):
    tablename = "teacher"

    id = Column(BIGINT(unsigned=True), primary_key=True, autoincrement=True)
    name = Column(VARCHAR(15), nullable=False, comment="姓名")

    def __init__(self, name):
        self.name = name


# 删除所有表结构(一般不用,这里只是为了每次都初始化数据)
Base.metadata.drop_all(engine)
# 创建表结构(如果映射已经发生了改变不会重复创建)


Base.metadata.create_all(engine)
# 创建session实例
session = sessionmaker(engine)()

users = [("xiaoming", "shanghai", "description1", ["活泼", "热情", "美丽"], [1, 2, 3]),
         ("xiaohong", "beijing", "description2", ["开朗"], [1, 2]),
         ("wangwu", "hangzhou", "description3", ["机敏"], [2, 3]),
         ("suncity", "suzhou", "description4", ["健谈"], [1, 3])
         ]
teacher_zhang = Teacher("张老师")
teacher_wang = Teacher("王老师")
teacher_li = Teacher("李老师")
teacher_list = [teacher_zhang, teacher_wang, teacher_li]
# 批量插入
session.add_all(teacher_list)

for username, city, description, tags, teachers in users:
    # 执行原生SQL
    result = session.execute("insert into user(username, gender, age, status)values(:username, :gender, :age, :status)",
                             params={"username": username, "gender": randint(0, 1), "age": randint(0, 150),
                                     "status": StatusEnum.OPEN.value})
    session.commit()
    user_id = result.lastrowid

    session.add(UserDetail(city, description, user_id))
    for tag in tags:
        session.add(Tag(tag, user_id))

    for teacher_id in teachers:
        session.execute("insert into user_teacher(user_id, teacher_id)values(:userId, :teacherId)",
                        params={"userId": user_id, "teacherId": teacher_id})
else:
    session.commit()

3. execute

# 执行原生SQL
result = session.execute("select * from user where id > 1 order by create_time desc limit 1, 10").fetchall()
print(result)

4. filter

# select * from user
session.query(User).all()

# select * from user where id = 1
session.query(User).get(1)

# select * from user where username ='suncity' limit 1
session.query(User).filter_by(username="suncity").first()

# select * from user where username !='suncity' limit 1
session.query(User).filter(User.username!="suncity").first()

# select * from user where username like 'xiao%'
session.query(User).filter(User.username.like("xiao%")).all()

# ilike忽略大小写,其实是统一转为小写
# select * from user where lower(user.username) LIKE lower("%xiao")
session.query(User).filter(User.username.ilike("xiao%")).all()

# select * from user where id in(1, 2)
session.query(User).filter(User.id.in_([1, 2])).all()

# select * from user where id not in(1, 2)
session.query(User).filter(User.id.notin_([1, 2])).all()

# select * from user where update_time IS NULL
session.query(User).filter(User.update_time == None).all()

# select * from user where update_time IS NOT NULL
session.query(User).filter(User.update_time != None).all()

# filter_by指定属性时不需要指定类名, query中可以指定要查询的列, label给列起别名就是SQL中的as
# select id, username, create_time as join_time from user where username = "suncity"
session.query(User.id, User.username, User.create_time.label("join_time")).filter_by(username="suncity").all()

# and方式一:多个filter使用and拼接
# select * from user where id in(1, 2) and username like 'xiao%'
session.query(User).filter(User.id.in_([1, 2])).filter(User.username.like('xiao%')).all()

# and方式二:将多个条件写在一个filter中
# select * from user where id = 1 and username = 'xiaoming'
session.query(User).filter(User.id == 1, User.username == 'xiaoming').first()

# and方式三:使用and_来指定
# select * from user where id = 1 and username = 'xiaoming'
session.query(User).filter(and_(User.id == 1, User.username == 'xiaoming')).first()

# or_
# select * from user where id = 1 or username = 'xiaohong'
session.query(User).filter(or_(User.id == 1, User.username == 'xiaohong')).all()

# select * from user where id = 1 and (username='xiaoming' or gender = 0)
session.query(User).filter(User.id == 1, or_(User.username == 'xiaoming', User.gender == 0)).all()

5. 聚合函数

# func类有常用的聚合函数,如:count()、avg()、max()、min()、sum()
# SELECT count(user.id) FROM user LIMIT 1
session.query(func.count(User.id)).first()
# SELECT avg(user.age) FROM user LIMIT 1
session.query(func.avg(User.age)).first()
# SELECT max(user.age) FROM user LIMIT 1
session.query(func.max(User.age)).first()
# SELECT min(user.age) FROM user LIMIT 1
session.query(func.max(User.age)).first()
# SELECT sum(user.age) FROM user LIMIT 1
session.query(func.sum(User.age)).first()

6. 排序

# SELECT * FROM user ORDER BY create_time
session.query(User).order_by(User.create_time).all()
# SELECT * FROM user ORDER BY create_time DESC
session.query(User).order_by(User.create_time.desc()).all()
# SELECT * FROM user ORDER BY create_time ASC
session.query(User).order_by(User.create_time.asc()).all()

7. 分组

# 分组
# SELECT gender, count(user.id) AS count_1 FROM user GROUP BY gender
session.query(User.gender, func.count(User.id)).group_by(User.gender).all()
# SELECT gender, count(user.id) AS count_1 FROM user GROUP BY gender HAVING gender > 2
session.query(User.gender, func.count(User.id)).group_by(User.gender).having(User.gender > 2).all()

8. 分页

# select * from user limit 3
session.query(User).limit(3).all()
# select * from user limit 2, 18446744073709551615
session.query(User).offset(2).all()
# select * from user limit 2, 3
session.query(User).offset(2).limit(3).all()
# select * from user limit 1, 2
session.query(User).slice(1, 3).all()
# select * from user limit 1, 2
session.query(User)[1:3]

9. 关联查询和子查询

# 关联查询
# SELECT user.username, user_detail.city FROM user INNER JOIN user_detail ON user.id = user_detail.user_id
session.query(User.username, UserDetail.city).join(UserDetail, User.id == UserDetail.user_id).all()
# SELECT user.username, user_detail.city FROM user LEFT OUTER JOIN user_detail ON user.id = user_detail.user_id
session.query(User.username, UserDetail.city).outerjoin(UserDetail, User.id == UserDetail.user_id).all()

# 子查询subquery
# SELECT
#  user.id AS user_id, user.username AS user_username
# FROM user, (SELECT user_detail.user_id AS user_id FROM user_detail WHERE user_detail.city = 'shanghai') AS anon_1
# WHERE user.id IN (anon_1.user_id)
sq = session.query(UserDetail.user_id).filter(UserDetail.city == "shanghai").subquery()
session.query(User.id, User.username).filter(User.id.in_(sq.c)).all()


# 所有的子查询都转换为多表连接查询
# SELECT user.id, user.username, anon_1.city
# FROM user, (SELECT user_detail.city FROM user_detail, user WHERE user_detail.id = user.id) AS anon_1
sq = session.query(UserDetail.city).filter(UserDetail.id == User.id).subquery()
session.query(User.id, User.username, sq.c.city).all()

10. 懒加载

# 懒加载 backref(lazy="select")
# select * from user where id = 1
user = session.query(User).get(1)
# 当获取user.detail.city时会执行 select * from user_detail where user_id = 1
print(user.detail.city)

# select * from user_detail where user_id = 1
user_detail = session.query(UserDetail).filter_by(user_id=1).first()
# select * from user where id = 1
print(user_detail.user.username)


user3 = session.query(User).filter_by(id=1).first()
print(str([item.tag for item in user3.tags]))
# <class 'sqlalchemy.orm.collections.InstrumentedList'>
print(type(user3.tags))

11. 级联添加

# 级联添加
# insert into user(username, gender, amount, status, update_time)values("admin", 0, null, null, null)
# insert into user_detail(city, description, user_id)values('shenzhen', 'admin description', 5)
# INSERT INTO user_teacher (user_id, teacher_id) VALUES(5, 1)(5, 3)
admin_user = User("admin", 0)
detail = UserDetail(city="shenzhen", description="admin description")
admin_user.detail = detail
admin_user.teachers = [teacher_zhang, teacher_li]
session.add(admin_user)
session.commit()


# 反向级联添加
# insert into user(username, gender, amount, status, update_time)values("root", 0, null, null, null)
# insert into user_detail(city, description, user_id)values('wuhan', 'root description', 5)
rootdetial = UserDetail(city="wuhan", description="root description")
rootdetial.user = User("root", 1)
session.add(rootdetial)
session.commit()

12. update

# 更新: 一般先查询出来,然后修改属性的值然后提交即可完成修改
obj = session.query(User).filter_by(id=1).first()
obj.gender = 0
session.commit()

13. delete

# 删除主表时如果有其它表外键引用主表主键orm会先处理掉使用外键的表(删除记录、设置外键为null),然后最后再处理主表
# 多对多:DELETE FROM user_teacher WHERE user_teacher.user_id = 1
# 一对多:会将外键设置为null, UPDATE tag SET user_id=null WHERE tag.id in (1, 2, 3)
# 一对一:会将外键设置为null, UPDATE user_detail SET user_id=null WHERE user_detail.id = 1
# 最后才会删除主表: DELETE FROM user WHERE user.id = 1
obj = session.query(User).get(1)
session.delete(obj)
session.commit()


举报

相关推荐

0 条评论