2012-04-25 222 views
28

我的应用程序正在使用作用域会话和SQLALchemy的声明式样式。这是一个web应用程序,许多数据库插入由Celery执行,一个任务调度程序。在插入SQLAlchemy(声明式)时处理重复的主键

通常情况下,决定插入的对象时,我的代码可能会做大致如下的内容:

from schema import Session 
from schema.models import Bike 

pk = 123 # primary key 
bike = Session.query(Bike).filter_by(bike_id=pk).first() 
if not bike: # no bike in DB 
    new_bike = Bike(pk, "shiny", "bike") 
    Session.add(new_bike) 
    Session.commit() 

这里的问题是,由于很多,这是通过异步的工人来做,有可能为一个虽然插入Bikeid=123,而另一个正在检查其存在,但工作中途中断。在这种情况下,第二个worker将尝试使用相同的主键插入一行,并且SQLAlchemy将引发一个IntegrityError

我不能为我的生活找到一个很好的方式,从换出Session.commit()除了解决这个问题:

'''schema/__init__.py''' 
from sqlalchemy.orm import scoped_session, sessionmaker 
Session = scoped_session(sessionmaker()) 

def commit(ignore=False): 
    try: 
     Session.commit() 
    except IntegrityError as e: 
     reason = e.message 
     logger.warning(reason) 

     if not ignore: 
      raise e 

     if "Duplicate entry" in reason: 
      logger.info("%s already in table." % e.params[0]) 
      Session.rollback() 

然后我到处有Session.commit我现在有schema.commit(ignore=True)哪里我不不介意该行不再被插入。

对我来说这似乎非常脆弱,因为字符串检查。正如一个供参考,当IntegrityError提高它看起来像这样:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'") 

所以当然是我被插入主键是像Duplicate entry is a cool thing话,我想我可能会错过IntegrityError的这实际上不是因为重复的主键。

是否有更好的方法,它保持我用干净的SQLAlchemy的方法(而不是开始写出来的字符串等语句。)

Db的是MySQL的(尽管单元测试我喜欢使用SQLite,并不想用任何新方法来阻止这种能力)。

干杯!

+3

你为什么不考虑使用自动递增为您生成主键?那么你不必担心这个问题。 还是有没有这样做的具体原因? – mata 2012-04-25 19:40:39

+0

有一个特定的原因(对不起,这个例子有点琐碎)。 – Edwardr 2012-04-25 19:50:37

回答

6

您应该以相同的方式处理每个IntegrityError:回滚事务,并且可以再次尝试。一些数据库甚至不会让你在IntegrityError之后做更多的事情。你也可以在表上获得锁,或者在数据库允许的情况下获得更细粒度的锁,在两个冲突的事务开始时。

使用with语句显式开始一个事务,并自动提交(或回滚上的任何异常):

from schema import Session 
from schema.models import Bike 

session = Session() 
with session.begin(): 
    pk = 123 # primary key 
    bike = session.query(Bike).filter_by(bike_id=pk).first() 
    if not bike: # no bike in DB 
     new_bike = Bike(pk, "shiny", "bike") 
     session.add(new_bike) 
+0

嗨。我不打算在同一时间安排插入和检查。问题在于该对象碰巧是由两个独立的进程以临时方式创建的。没有什么不愉快的事情,它只是应用程序的方式(事实上对象不是自行车,它们是*次*)。然而,你说的是运行单个工人。我正在研究如何指定单个工作人员管理所有与数据库相关的任务,这将提供我需要的同步性。从应用程序插入不是一个选项。数据库在远程机器上,我需要100毫秒的Web-app响应。 – Edwardr 2012-04-26 09:03:23

+0

设计几乎总是要归咎于这些类型的SQL问题。例如,你确定你不能让数据库的主键自动递增,并处理偶尔的'两行'以前的主键列'结果? – joeforker 2012-04-26 15:15:10

+0

[对不起,我应该补充一点,PK没有自动增量是有原因的]我只是不确定我是否同意。数据库由许多其他应用程序共享,包括使用有问题的表格。为什么在你做了一些尽职调查之后,数据库可能会插入我的另一个进程/应用程序/人类,这是不好的设计?关键是你必须在你的应用中处理这个问题。我的问题很简单,我可以看到在SQLAlchemy中处理这个问题的唯一方法是通过字符串检查,而且它看起来并不健壮。 – Edwardr 2012-04-26 21:53:59

3

我假设你的主键下面是一些自然的方式,这就是为什么你不能依靠正常的自动增量技术。所以,让我们说这个问题真的是你需要插入的一个独特的列,这是更常见的。

如果您想要“尝试插入,在故障时部分回滚”,那么您使用SAVEPOINT,它与SQLAlchemy是begin_nested()。下一个rollback()或commit()只作用于SAVEPOINT,而不是更大范围的事情。

但是,总体来说,这里的模式只是一个应该避免的模式。你真正想在这里做的是三件事之一。 1。不要运行处理需要插入的相同密钥的并发作业。 2.以某种方式同步正在使用的并发密钥上的作业3.使用一些通用服务来生成这种特定类型的新记录,由作业共享(或确保它们在作业运行之前都已设置好)。

如果你考虑一下,#2在任何情况下都会发生高度的隔离。开始两个postgres会话。第1节:

test=> create table foo(id integer primary key); 
NOTICE: CREATE TABLE/PRIMARY KEY will create implicit index "foo_pkey" for table "foo" 
CREATE TABLE 
test=> begin; 
BEGIN 
test=> insert into foo (id) values (1); 

会议2:

test=> begin; 
BEGIN 
test=> insert into foo(id) values(1); 

你将看到的是,会话2块,与PK#1的行被锁定。我不确定MySQL是否足够聪明来做到这一点,但这是正确的行为。如果OTOH试图插入不同的PK:

^CCancel request sent 
ERROR: canceling statement due to user request 
test=> rollback; 
ROLLBACK 
test=> begin; 
BEGIN 
test=> insert into foo(id) values(2); 
INSERT 0 1 
test=> \q 

它进行得很好,没有阻塞。

问题是如果你正在做这种PK/UQ竞争,你的芹菜任务将会自行序列化无论如何,或者至少,他们应该是。

23

如果您使用session.merge(bike)而不是session.add(bike),那么您将不会生成主键错误。 bike将根据需要进行检索和更新或创建。

+6

如果你使用合并,你仍然可以得到完整性错误,如果你在不同的会话上同时进行两次合并。 – Sjoerd 2015-09-17 09:57:38

+0

当会话适合内存时,这个答案很好,但对于较大的查询不太好。所以如果你想添加比内存更多的数据,你不能只创建一堆会话并合并它们,对吧? – elplatt 2016-05-23 18:39:29

2

而不是session.add(obj)你需要使用下面提到的代码,这将是更干净,你不需要像你所说的使用自定义提交功能。然而,这将忽略冲突,不仅对重复键而且对其他人也是如此。

的mysql:

self.session.execute(insert(self.table, values=values, prefixes=['IGNORE'])) 

sqlite的

self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))