0
点赞
收藏
分享

微信扫一扫

SQLAlchemy核心查询Core Query和对象查询Object Query

佛贝鲁先生 04-04 08:00 阅读 2

在SQLAlchemy 中,查询可以分为两类:核心查询和对象查询。

核心查询(Core Query)是指使用SQLAlchemy的核心功能进行查询的方式。这种查询方式使用的是SQLAlchemy的表对象(Table Object)和查询构建器(Query Builder)来构建查询语句,并且不依赖于任何特定的ORM(Object-Relational Mapping)映射。核心查询通常用于更复杂的查询需求,例如使用原生SQL语句进行查询。

对象查询(Object Query)是指使用SQLAlchemy的ORM功能进行查询的方式。这种查询方式使用的是SQLAlchemy的模型类(Model Class)和查询语句(Query Statement)来构建查询语句,并且基于ORM映射进行查询。对象查询更适合于简单的查询需求,例如通过模型类的属性进行查询。

总的来说,SQLAlchemy核心查询Core Query更接近于直接的 SQL 查询和操作,而对象查询Object Query则更加面向对象,通过模型类和对象的方式进行数据库操作。开发者可以根据具体的需求选择使用其中的一种或者结合两者使用。

1. **核心查询(Core Queries)**:
   
   核心查询主要涉及 SQLAlchemy 的核心库,用于执行 SQL 查询和操作数据库。这些查询提供了对数据库的直接访问,并返回原始的数据库行或结果。核心查询通常涉及以下几个主要组件:

   - `select()`:用于构建 SELECT 语句。query = select(users.c.name, users.c.email)
   - `insert()`:用于构建 INSERT 语句。query = insert(users).values(name='Alice', email='alice@example.com')
   - `update()`:用于构建 UPDATE 语句。query = update(users).where(users.c.name == 'Alice').values(email='new_email@example.com')
   - `delete()`:用于构建 DELETE 语句。query = delete(users).where(users.c.name == 'Alice')
   - `text()`:用于直接执行原始 SQL 语句。sql = text("SELECT name, email FROM users")
---
SQLAlchemy 核心查询是指使用 SQLAlchemy 核心库中的工具和方法执行 SQL 查询操作。它提供了一种更直接的方法来与数据库交互,而不需要使用 ORM(对象关系映射)模型。
在 SQLAlchemy 中,Table 对象代表数据库中的表,而表中的列则通过 .c 属性来访问。因此,在构建查询语句时,需要使用 .c 属性来指定表中的列
--
在 SQLAlchemy 核心查询中,条件查询是指根据特定的条件从数据库中检索数据。条件查询可以使用 `where()` 方法来添加过滤条件,以筛选满足特定条件的记录。
以下是条件查询的一些常见用法:
1. **等值查询:**
   使用 `==` 运算符来进行等值查询,即查询某一列的值等于特定值的记录。
query = select(users).where(users.c.name == 'Alice')
2. **不等值查询:**
   使用 `!=` 或 `<>` 运算符来进行不等值查询,即查询某一列的值不等于特定值的记录。
query = select(users).where(users.c.name != 'Alice')
3. **范围查询:**
   使用 `between()` 方法来进行范围查询,即查询某一列的值在指定范围内的记录。
query = select(users).where(users.c.age.between(18, 30))
4. **空值查询:**
   使用 `is_(None)` 方法来进行空值查询,即查询某一列的值为空的记录。
query = select(users).where(users.c.email.is_(None))
5. **模糊查询:**
   使用 `like()` 方法来进行模糊查询,即查询某一列的值匹配特定模式的记录。
query = select(users).where(users.c.name.like('A%'))
6. **组合条件查询:**
   可以使用 `and_()`、`or_()` 和 `not_()` 方法来组合多个条件,进行复杂的条件查询。
query = select(users).where(and_(users.c.age.between(18, 30), users.c.name == 'Alice'))
以上是一些常见的条件查询方法,您可以根据具体的需求使用这些方法来构建条件查询语句。

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, insert, select,delete,update

# 创建引擎并连接数据库
engine = create_engine('sqlite:///metadataexample.db')

# 创建元数据对象
metadata = MetaData()

# 定义表结构
users = Table('users', metadata,
              Column('id', Integer, primary_key=True),
              Column('name', String),
              Column('email', String))

# 创建表
metadata.create_all(engine)

# 构建删除所有记录的语句
delete_all_query = delete(users)

# 执行删除所有记录操作
with engine.connect() as conn:
    conn.execute(delete_all_query)
    conn.commit()

# 要插入的数据列表
users_to_insert = [
    {'name': 'Alice', 'email': 'alice@example.com'},
    {'name': 'Bob', 'email': 'bob@example.com'},
    {'name': 'Charlie', 'email': 'charlie@example.com'},
    {'name': 'David', 'email': 'david@example.com'}
]

# 构建插入语句并插入数据
query = insert(users)
with engine.connect() as conn:
    for user_data in users_to_insert:
        conn.execute(query.values(**user_data))
    conn.commit()

# 构建查询 #Table 对象代表数据库中的表,而表中的列则通过 .c 属性来访问
query = select(users.c.name, users.c.email)

# 执行查询并获取结果
with engine.connect() as conn:
    result = conn.execute(query)
    rows = result.fetchall()
    for row in rows:
        print(row)

# 构建删除语句
delete_query = delete(users).where(users.c.name == 'Alice')

# 执行删除操作
with engine.connect() as conn:
    conn.execute(delete_query)
    conn.commit()

# 构建更新语句
update_query = update(users).where(users.c.name == 'Bob').values(email='new_email@example.com')

# 执行更新操作
with engine.connect() as conn:
    conn.execute(update_query)
    conn.commit()

# 再次执行查询并获取结果
with engine.connect() as conn:
    result = conn.execute(query)
    rows = result.fetchall()
    for row in rows:
        print(row)

 # 执行批量插入操作

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, text

# 创建引擎并连接数据库
engine = create_engine('sqlite:///users_to_insertexample.db')

# 创建元数据对象
metadata = MetaData()

# 定义 users 表结构
users = Table('users', metadata,
              Column('id', Integer, primary_key=True),
              Column('name', String),
              Column('email', String))  # 添加 email 列

# 如果不存在则创建表
metadata.create_all(engine)

# 要插入的数据列表
users_to_insert = [
    {'name': 'Alice', 'email': 'alice@example.com'},
    {'name': 'Bob', 'email': 'bob@example.com'},
    {'name': 'Charlie', 'email': 'charlie@example.com'},
    {'name': 'David', 'email': 'david@example.com'}
]

# 构建插入语句
insert_query = text("INSERT INTO users (name, email) VALUES (:name, :email)")

# 执行批量插入操作
with engine.connect() as conn:
    conn.execute(insert_query, [{"name": user['name'], "email": user['email']} for user in users_to_insert])
    conn.commit()



select_query = text("SELECT * FROM users")
with engine.connect() as conn:
    result = conn.execute(select_query)
    rows = result.fetchall()
    for row in rows:
        print(row)

 

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select

# 创建引擎并连接到数据库
engine = create_engine('sqlite:///in_function.db')
metadata = MetaData()
conn = engine.connect()

# 定义表结构
users = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('age', Integer)
)
metadata.create_all(engine)

# 插入一些数据
# users.insert():创建了一个 INSERT 语句,表示要向 users 表中插入数据。
# conn.execute(users.insert(), [...]):执行了 INSERT 语句。conn 是数据库连接对象,execute() 方法用于执行 SQL 命令。第一个参数是要执行的 SQL 命令,即 users.insert(),第二个参数是要插入的数据,这里使用了列表的形式传入多个字典,每个字典表示一行数据。
# 每个字典表示一行数据,包含了要插入的列和相应的值。在这个例子中,每行数据都包括 name 和 age 两列的值。
conn.execute(users.insert(), [
    {'name': 'Alice', 'age': 25},
    {'name': 'Bob', 'age': 30},
    {'name': 'Charlie', 'age': 35},
    {'name': 'David', 'age': 40}
])

# 选择年龄在指定列表中的行
ages = [25, 35]
query = select(users).where(users.c.age.in_(ages))
result = conn.execute(query)

for row in result:
    print(row)

 

2. **对象查询(Object Queries)**:

   对象查询是使用 SQLAlchemy 的 ORM 功能进行的查询,它允许开发者将数据库表映射为 Python 对象,并使用这些对象来进行数据库操作。对象查询提供了更加面向对象的接口,使得与数据库交互更加直观和灵活。主要组件包括:

   - 定义模型类(Model Class):定义 Python 类来映射数据库表。
   - 使用查询对象(Query Object):通过模型类的查询对象执行查询。
   - 过滤器和条件:使用过滤器和条件来约束查询结果。
   - 增删改查操作:通过 ORM 提供的方法进行数据库操作,如添加、删除、修改、查询等。

SQLAlchemy 2.0的ORM对象查询(Object Queries)支持各种条件查询,常用的条件查询包括:
****等于(Equal):使用==运算符进行相等比较。
session.query(User).filter(User.name == 'John')
****不等于(Not Equal):使用!=运算符进行不等比较。
session.query(User).filter(User.age != 30)
****大于(Greater Than):使用>运算符进行大于比较。
session.query(User).filter(User.age > 21)
****大于等于(Greater Than or Equal):使用>=运算符进行大于等于比较。
session.query(User).filter(User.age >= 18)
****小于(Less Than):使用<运算符进行小于比较。
session.query(User).filter(User.age < 50)
****小于等于(Less Than or Equal):使用<=运算符进行小于等于比较。
session.query(User).filter(User.age <= 65)
****包含于(In):使用in_()方法判断某个字段的值是否在给定列表中。
session.query(User).filter(User.name.in_(['John', 'Jane', 'Alice']))
****不包含于(Not In):使用notin_()方法判断某个字段的值是否不在给定列表中。
session.query(User).filter(User.age.notin_([18, 21, 25]))
****包含(Like):使用like()方法进行模糊匹配。
session.query(User).filter(User.name.like('%son%'))
****不包含(Not Like):使用notlike()方法进行模糊不匹配。
session.query(User).filter(User.name.notlike('%son%'))
****与(And):使用and_()方法进行与操作。
session.query(User).filter(and_(User.age > 18, User.age < 30))
****或(Or):使用or_()方法进行或操作。
session.query(User).filter(or_(User.age < 18, User.age > 65)) 

from sqlalchemy import create_engine, Column, Integer, String, and_, or_, not_
from sqlalchemy.orm import declarative_base, sessionmaker

# 创建引擎并连接数据库
engine = create_engine('sqlite:///Aliceexample.db')

# 创建基类
Base = declarative_base()

# 定义用户模型类
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

# 创建表
Base.metadata.create_all(engine)

# 创建Session类
Session = sessionmaker(bind=engine)

# 创建Session实例
session = Session()

# 添加新用户
new_user = User(name='Alice', email='alice@example.com')
session.add(new_user)
new_user2 = User(name='Alice2', email='alice2@example.com')
session.add(new_user2)
new_user3 = User(name='gem', email='gem@example.com')
session.add(new_user2)
session.commit()

# 查询用户
# user = session.query(User).filter_by(name='Alice2').first()
# 使用 and_() 函数组合多个条件
user = session.query(User).filter(and_(User.name == 'Alice2', User.email == 'alice2@example.com')).first()
print("查询用户:", user.name, user.email if user else None)

# 修改用户信息
if user:
    user.email = 'new_email@example.com'
    session.commit()

# 删除用户
# if user:
#     session.delete(user)
#     session.commit()

# 查询所有用户
# 使用 or_() 函数匹配多个条件中的任意一个
users = session.query(User).filter(or_(User.name == 'Alice', User.name == 'Alice2')).all()

# 输出结果
print("Alice/Alice2所有用户:")
for user in users:
    print(user.name, user.email)

# 查询不满足条件的用户
# 使用 not_() 函数取反条件
users_not_Alice = session.query(User).filter(not_(User.name == 'Alice')).all()
print("不是 Alice 的用户:")
for user in users_not_Alice:
    print(user.name, user.email)

# 查询名字在指定列表中的用户
# 使用 in_() 函数匹配列表中的值
names_to_query = ['Alice', 'Alice2']
users_in_list = session.query(User).filter(User.name.in_(names_to_query)).all()
print("名字在指定列表中的用户:")
for user in users_in_list:
    print(user.name, user.email)

在 SQLAlchemy 的 ORM 中执行批量插入操作通常使用 Session 对象的 bulk_save_objects() 方法或 bulk_insert_mappings() 方法。这两种方法都可以高效地执行大量数据的插入操作。 

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker

# 创建引擎并连接数据库
engine = create_engine('sqlite:///bulk_save_objectsexample.db')

# 创建基类
Base = declarative_base()

# 定义映射类
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

# 创建所有表
Base.metadata.create_all(engine)

# 创建 Session
Session = sessionmaker(bind=engine)
session = Session()

# 构建要插入的数据列表
users_to_insert = [
    {'name': 'Alice', 'email': 'alice@example.com'},
    {'name': 'Bob', 'email': 'bob@example.com'},
    {'name': 'Charlie', 'email': 'charlie@example.com'},
    {'name': 'David', 'email': 'david@example.com'}
]

# 执行批量插入操作
session.bulk_save_objects([User(**user) for user in users_to_insert])


# # 执行批量插入操作
# session.bulk_insert_mappings(users, users_to_insert)

# 提交事务
session.commit()

# 查询所有用户
users = session.query(User).all()
for user in users:
    print(user.name, user.email)
举报

相关推荐

0 条评论