result = session.query(MyTable).filter(MyTable.json_column['key'] == 'value').all()
class SysTestModel(Base):
__tablename__="sys_test_data"
id = Column(BlGlNT, primary_key=True)
name = Column(String(64), comment="name")
relation_dict = Column("relation_dict", JSON, comment="json数据")
memo = Column(String(10))
def__str__(self):
return "<SysTestModel f>".format(self.id)
# [{"rel": "小明"}, {"rel": "123"}, {"rel": "小红"}]
query = SysTestModel.query.filter(db.func.json_contains(SysTestModel.relation_ dict, db.func.json_ object("rel", "123")
# 更新JSON列中的列表值
with engine.connect() as connection:
connection.execute(
my_table.update()
.values(
data={
'list_key': literal(
"[%(new_value)s]" % {"new_value": "新的值"},
type_=my_table.c.data.type,
)
}
)
.where(my_table.c.data['list_key'][0] == "旧的值")
)
from sqlalchemy import create_engine, Table, MetaData, Column, Integer, String, cast, func
from sqlalchemy.dialects.postgresql import JSON, ARRAY
# 假设你已经有了一个engine
engine = create_engine('postgresql://user:password@localhost/mydatabase')
metadata = MetaData()
# 定义表结构
my_table = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', JSON)
)
# 更新JSON字段中的数组
new_items = [1, 2, 3] # 新的数组
stmt = my_table.update().where(my_table.c.id == 1).values(
data={
'items': cast(ARRAY(new_items), JSON)
}
)
with engine.connect() as connection:
connection.execute(stmt)
from sqlalchemy import create_engine, MetaData, Table, select, update
from sqlalchemy.dialects.postgresql import JSON, json_unquote, json_contains, json_extract
# 假设你已经有了一个PostgreSQL的Engine
engine = create_engine('postgresql://user:password@localhost:5432/mydatabase')
metadata = MetaData()
# 假设有一个名为example_table的表,它有一个名为data的JSON类型字段
table = Table('example_table', metadata, autoload_with=engine)
# 更新JSON字段里的数组
# 假设data字段包含一个名为"array"的数组,我们要在这个数组中添加一个元素
update_stmt = update(table).where(
table.c.id == 'some_id'
).values(
data=json_unquote(
"{array: " +
"array_append(coalesce(data->>'array', '[]'), '" + new_value_to_add + "')}"
)
)
# 执行更新
with engine.connect() as conn:
conn.execute(update_stmt)
# 检查数组中是否包含特定值
# 假设我们要检查data字段的"array"数组是否包含值"some_value"
select_stmt = select([
json_contains(json_unquote(table.c.data), 'some_value').label('contains_value')
]).where(
table.c.id == 'some_id'
)
# 执行查询
with engine.connect() as conn:
result = conn.execute(select_stmt).fetchone()
contains_value = result['contains_value']
# 提取JSON对象中的值
# 假设我们要从data字段中提取"key"的值
select_stmt = select([
json_extract(json_unquote(table.c.data), 'key').label('extracted_value')
]).where(
table.c.id == 'some_id'
)
# 执行查询
with engine.connect() as conn:
result = conn.execute(select_stmt).fetchone()
extracted_value = result['extracted_value']
SQLAlchemy玩转MySQL JSON数据类型
MySQL的JSON数据类型介绍以及JSON的解析查询
https://cloud.tencent.com/developer/article/2068668
使用SQLAlchemy在MySQL上的JSON列中查询数组