0
点赞
收藏
分享

微信扫一扫

sqlalchemy操作数据表的创建与数据的增删改查


文章目录

  • ​​数据表​​
  • ​​基本使用​​
  • ​​增删改查​​
  • ​​查询基础​​
  • ​​查询进阶​​

数据表

  • 单表的创建
  • 一对多表
  • 多对多表

​models.py​

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 字段和字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

Base = declarative_base()

class User(Base):
__tablename__ = "users" # 默认以类名小写作为表名
id = Column(Integer, primary_key=True) # id 主键
# mysql中主键自动建索引:聚簇索引
# 其他索引叫辅助索引
name = Column(String(32), index=True, nullable=False) # name列, 索引, 不可为空
email = Column(String(32), unique=True) # 唯一
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)

# 类似于django的Meta
__table_args__ = (
UniqueConstraint("id", "name", name="uix_id_name"), #联合唯一
Index("ix_id_name", "name", "email"), # 索引
)

def __str__(self):
return self.name

def __repr__(self):
return self.name

# 一对多关系

# 一个Hobby可以有很多人喜欢
# 一个人只能有一个Hobby
class Hobby(Base):
__tablename__ = "hobby"
id = Column(Integer, primary_key=True)
caption = Column(String(50), default="篮球")

class Person(Base):
__tablename__ = "person"
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名, uselist=False
# 一对多的关系, 关联字段写在多的一方
hobby_id = Column(Integer, ForeignKey("hobby.id")) # 可以为空

# 跟数据库无关, 不会新增字段, 只用于快速链表操作
# 类名, backref 用于反向查询
# Hobby.pers可以拿到所有的Person对象
hobby = relationship("Hobby", backref="pers")

# 多对多
# 第三张表
class Boy2Girl(Base):
__tablename__ = "boy2girl"
id = Column(Integer, primary_key=True, autoincrement=True) # 默认为True自增
girl_id = Column(Integer, ForeignKey("girl.id"))
boy_id = Column(Integer, ForeignKey("boy.id"))

class Girl(Base):
__tablename__ = "girl"
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)

class Boy(Base):
__tablename__ = "boy"

id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)

# 与生成表结构无关, 仅用于查询方便, 放在哪个表单中都可以
# secondary 指定通过哪个表建立关联
girls = relationship("Girl", secondary="boy2girl", backref="boys")

# 创建表
def create_table():
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/aaa?charset=utf8",
max_overflow = 0, # 超过连接池大小外最多创建的连接
pool_size = 5, # 连接池大小
pool_timeout = 30, # 池中没有线程最多等待的时间, 否则报错
pool_recycle = -1 # 多久之后对线程池的线程进行一次连接的回收(重置)
)
# 通过engine对象创建表
Base.metadata.create_all(engine)

# 删除表
def drop_table():
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/aaa?charset=utf8",
max_overflow = 0, # 超过连接池大小外最多创建的连接
pool_size = 5, # 连接池大小
pool_timeout = 30, # 池中没有线程最多等待的时间, 否则报错
pool_recycle = -1 # 多久之后对线程池的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)


if __name__=="__main__":
create_table()
# drop_table()

基本使用

from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from models import User

# 1. 制作engine
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/aaa?charset=utf8",
max_overflow = 0, # 超过连接池大小外最多创建的连接
pool_size = 5, # 连接池大小
pool_timeout = 30, # 池中没有线程最多等待的时间, 否则报错
pool_recycle = -1 # 多久之后对线程池的线程进行一次连接的回收(重置)
)
# 2. 制造一个Connection类
Connection = sessionmaker(bind=engine)
# 3. 得到一个conn对象
# conn = Connection()
conn = scoped_session(Connection)
# 4. 创建一个对象
obj1 = User(name="xiaoda", email="123@qq.com")
# 5. 把对象通过add放入
conn.add(obj1)
# 6. 提交
conn.commit()
conn.close()

增删改查

from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from models import User, Person, Hobby

# 1. 制作engine
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/aaa?charset=utf8",
max_overflow = 0, # 超过连接池大小外最多创建的连接
pool_size = 5, # 连接池大小
pool_timeout = 30, # 池中没有线程最多等待的时间, 否则报错
pool_recycle = -1 # 多久之后对线程池的线程进行一次连接的回收(重置)
)
# 2. 制造一个Connection类
Connection = sessionmaker(bind=engine)
# 3. 得到一个conn对象
conn = Connection()

# obj1 = User(name="awda", email="123awd@qq.com")
# obj2 = User(name="xtgh", email="35fse@qq.com")
# obj3 = User(name="EFfes", email="c3r23@qq.com")
# 新增相同对象
# conn.add_all([obj1, obj2, obj3])

# 新增不同对象
# conn.add_all([Person(name="laz"), Hobby()])


###2 简单删除 (查到删除)
# 注意filter与filter_by的区别
# res = conn.query(User).filter_by(name="xiaoda").delete()
# res = conn.query(User).filter(User.id>3).delete()
# conn.commit()
# print(res.name)

### 3. 修改
# res = conn.query(User).filter_by(id=3).update({"name": "exe"})
# res = conn.query(User).filter_by(id=3).update({User.name: "exe"})

# 类似与django的F查询
# conn.query(User).filter(User.id > 3).update({User.name: User.name + "999"}, synchronize_session=False) # 如果要把它转成字符串相加
# conn.query(User).filter(User.id > 3).update({"age": User.age + 99}, synchronize_session="evaluate") # 如果要把它转成数字相加

### 4 基本查询

# res = conn.query(User).all()
# res = conn.query(User).first()

# filter传的表达式, filter_by传的是参数
# res = conn.query(User).filter(User.id==1).all()
# res = conn.query(User).filter(User.id>=1).all()
# res = conn.query(User).filter(User.id<=1).all()

# res = conn.query(User).filter_by(id=3).all()

# 自定义sql
res = conn.query(User).from_statement(text("select * from users where name=:name").params(name="exe")).all()
# print(res[0].name)


conn.commit()
# 并没有真正关闭连接, 而是放回池子中
conn.close()

查询基础

from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from models import User, Person, Hobby

# 1. 制作engine
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/aaa?charset=utf8",
max_overflow = 0, # 超过连接池大小外最多创建的连接
pool_size = 5, # 连接池大小
pool_timeout = 30, # 池中没有线程最多等待的时间, 否则报错
pool_recycle = -1 # 多久之后对线程池的线程进行一次连接的回收(重置)
)

Connection = sessionmaker(bind=engine)
conn = Connection()

# res = conn.query(User).filter_by(name="exe").all()

# 表达式, and条件相连
# res = conn.query(User).filter(User.id>1, User.name == "exe").all()

# # 查找id在1和10之间,并且name=exe的对象
# res = conn.query(User).filter(User.id.between(1, 10), User.name == "exe").all()

# in条件
# res = conn.query(User).filter(User.id.in_([3, 6, 7])).all()
# 取反 ~
# res = conn.query(User).filter(~User.id.in_([3, 6, 7])).all()

# 二次筛选
# res = conn.query(User).filter(User.id.in_(conn.query(User.id).filter_by(name="exe"))).all()
# res = conn.query(User.id, User.name).filter(User.id.in_(conn.query(User.id).filter_by(name="exe"))).all()

"""
SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.ctime AS users_ctime, users.extra AS users_extra
FROM users
WHERE users.id IN (SELECT users.id AS users_id
FROM users
WHERE users.name = %(name_1)s)
"""

from sqlalchemy import and_, or_
# or_ 包裹的都是or条件 and_ 包裹的都是and条件
# 查询id>2并且name=exe的人
# res = conn.query(User).filter(and_(User.id>2, User.name == "exe")).all()

# 查询id>4或者name=exe的人
# res = conn.query(User).filter(or_(User.id>4, User.name == "exe")).all()

# res = conn.query(User).filter(
# or_(
# User.id < 5,
# and_(User.name == "xtgh999", User.id > 3),
# User.extra != ""
# )).all()

"""
SELECT * FROM users WHERE users.id < 5 OR users.name = "xtgh999" AND users.id > 3 OR users.extra != ''
"""

# 通配符, 以e开头, 不以e开头
# res = conn.query(User).filter(User.name.like("e%")).all()
# res = conn.query(User).filter(~User.name.like("e%")).all()

# 限制, 用于分页, 区间limit
# res = conn.query(User)[1:3]


# 排序, 根据name降序排列(从大到小)
# res = conn.query(User).order_by(User.name.desc()).all()
# res = conn.query(User).order_by(User.name.asc()).all()

# 第一个条件重复后, 再按第二个条件升序排
# res = conn.query(User).order_by(User.name.desc(), User.id.asc()).all()


# 分组
# 分组之后最大id, id之和, 最小id
# sql分组后, 要查询的字段只能有分组字段和集合函数
from sqlalchemy.sql import func
#
# res = conn.query(
# func.max(User.id),
# func.sum(User.id),
# func.min(User.id),
# User.name
# ).group_by(User.name).all()
#
# for obj in res:
# print(obj[0], "---",obj[1], "---",obj[2], "---", obj[3])


# having筛选
res = conn.query(
func.max(User.id),
func.sum(User.id),
func.min(User.id),
User.name
).group_by(User.name).having(func.min(User.id) > 7).all()

print(res)

查询进阶

from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from models import User, Person, Hobby, Boy, Boy2Girl, Girl

engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/aaa?charset=utf8",max_overflow = 0)
Connection = sessionmaker(bind=engine)
conn = Connection()

# 一对多插入数据
# Person表中要加 hobby = relationship("Hobby", backref="pers")
# p = Person(name="李四", hobby=Hobby(caption="足球"))
# conn.add(p)
# 等价于
# p = Person(name="李四3")
# p.hobby = Hobby(caption="足球")
# conn.add(p)
# 第二种方式
# hb = Hobby(caption="足球")
# hb.pers = [Person(name="小美"), Person(name="小胖")]
# conn.add(hb)

# conn.commit()

# 查询
# 基于对象的跨表查询
# p = conn.query(Person).filter_by(name="李四").first()
# print(p)
# print(p.hobby)
# print(p.hobby.caption)

# h = conn.query(Hobby).filter_by(caption="足球").first()
# h = conn.query(Hobby).filter_by(id=6).first()
# # 反查
# print(h.pers)

# 基于联表的跨表查(查一次)
# 默认根据外键联表
# isouter=True 左外连, 表示Person left join Hobby
# 没有右连接, 反过来即可(Hobby, Person).join(Person, isouter=True)
# 不写inner join
# res = conn.query(Person, Hobby).join(Hobby, isouter=True).all()
# print(res)

# for row in res:
# print(row[0].name, row[1].caption)
"""
select * from person left join hobby on person.hobby_id = hobby.id
"""
#
# res = conn.query(Person, Hobby).filter(Person.hobby_id == Hobby.id).all()
# print(res)

"""
select * from user,hobby where person.hobby_id = hobby.id
"""
# inner join
# res = conn.query(Person).join(Hobby)
# print(res)

### 要有 girls = relationship("Girl", secondary="boy2girl", backref="boys")
# 多对多
# girl = Girl(name="小白")
# girl.boys = [Boy(hostname="小黑"), Boy(hostname="小黄")]
# conn.add(girl)
# conn.commit()

# boy = Boy(hostname="小米")
# boy.girls = [Girl(name="小谢"), Girl(name="小碧")]
# conn.add(boy)
# conn.commit()

# 基于对象的跨表查
# girl = conn.query(Girl).filter_by(id=1).first()
# print(girl.boys)

# 基于联表的跨表查询
# res = conn.query(Girl.name).filter(Boy.id==Boy2Girl.boy_id, Girl.id==Boy2Girl.girl_id, Boy.hostname=="小米").all()
#
# res = conn.query(Girl.name).join(Boy2Girl).join(Boy).filter(Boy.hostname=="小米").all()
res = conn.query(Girl.name).join(Boy2Girl).join(Boy).filter_by(hostname="小米").all()

print(res)



# 执行原生sql
cursor = conn.execute("insert into users(name) values(:value)",params={"value": "xxx"})
conn.commit()


举报

相关推荐

0 条评论