如何在 postgresql 上使用 sqlalchemy 进行正确的更新插入?

2024-01-16

我想使用 sqlalchemy 核心使用 postgresql 9.5 添加的“新”功能进行更新插入。虽然它已实现,但我对语法感到非常困惑,它无法适应我的需求。 这是我希望能够执行的示例代码:

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
class User(Base):
    __tablename__ = 'test'
    a_id = Column('id',Integer, primary_key=True)
    a = Column("a",Integer)

engine = create_engine('postgres://name:password@localhost/test')
User().metadata.create_all(engine)
meta = MetaData(engine)
meta.reflect()
table = Table('test', meta, autoload=True)
conn = engine.connect()

from sqlalchemy.dialects.postgresql import insert as psql_insert
stmt = psql_insert(table).values({
    table.c['id']: bindparam('id'),
    table.c['a']: bindparam('a'),
})
stmt = stmt.on_conflict_do_update(
    index_elements=[table.c['id']],
    set_={'a': bindparam('a')},
)
list_of_dictionary = [{'id':1, 'a':1, }, {'id':2, 'a':2,}]
conn.execute(stmt, list_of_dictionary)

我基本上想插入大量行,如果一个 id 已被占用,我想用我最初想要插入的值来更新它。 然而 sqlalchemy 向我抛出这个错误:

CompileError: bindparam() name 'a' is reserved for automatic usage in the VALUES or SET clause of this insert/update statement.   Please use a name other than column name when using bindparam() with insert() or update() (for example, 'b_a').

虽然这是一个已知问题(请参阅https://groups.google.com/forum/#!topic/sqlalchemy/VwiUlF1cz_o https://groups.google.com/forum/#!topic/sqlalchemy/VwiUlF1cz_o),我没有找到任何不需要修改 list_of_dictionary 的键或列名称的正确答案。

我想知道是否有一种构造 stmt 的方法,以具有一致的行为,而不依赖于变量 list_of_dictionary 的键是否是插入表的列的名称(我的代码在这些中没有错误)例)。


这对我有用:

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import postgresql
from sqlalchemy.inspection import inspect

def upsert(engine, schema, table_name, records=[]):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]

    # assemble base statement
    stmt = postgresql.insert(table).values(records)

    # define dict of non-primary keys for updating
    update_dict = {
        c.name: c
        for c in stmt.excluded
        if not c.primary_key
    }

    # cover case when all columns in table comprise a primary key
    # in which case, upsert is identical to 'on conflict do nothing.
    if update_dict == {}:
        warnings.warn('no updateable columns found for table')
        # we still wanna insert without errors
        insert_ignore(table_name, records)
        return None


    # assemble new statement with 'on conflict do update' clause
    update_stmt = stmt.on_conflict_do_update(
        index_elements=primary_keys,
        set_=update_dict,
    )

    # execute
    with engine.connect() as conn:
        result = conn.execute(update_stmt)
        return result
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 postgresql 上使用 sqlalchemy 进行正确的更新插入? 的相关文章

随机推荐