2013-04-10 57 views
0

我有一个简单的过程,我需要处理表的记录,理想情况下运行多个进程的实例,而不处理相同的记录。我已经与MySQL这样做的方式是相当普遍的(虽然我认为令牌场更加黑客攻击的):PostgreSQL表上并行进程

添加几个字段的表:

CREATE TABLE records (
    id INTEGER PRIMARY KEY AUTO_INCREMENT, 
    ...actual fields... 

    processed_at DATETIME DEFAULT NULL, 
    process_token TEXT DEFAULT NULL 
); 

然后一个简单的处理脚本:

process_salt = md5(rand()) # or something like a process id 

def get_record(): 
    token = md5(microtime + process_salt) 
    db.exec("UPDATE records SET process_token = ? 
      WHERE processed_at IS NULL LIMIT 1", token) 
    return db.exec("SELECT * FROM records WHERE token = ?", token) 

while (row = get_record()) is valid: 
    # ...do processing on row... 

    db.exec("UPDATE records SET processed_at = NOW(), token = NULL 
      WHERE id = ?", row.id) 

我在一个使用PostgreSQL数据库的系统中实现这样一个过程。我知道Pg可以被认为比MySQL更成熟,关于MVCC锁定问题 - 我可以在Pg中使用行锁定还是其他功能,而不是令牌字段?

回答

1

这种做法将与PostgreSQL的工作,但它会往往是相当低效的,因为你要更新的每一行两次 - 每次更新需要交易,两次提交。通过使用commit_delay并可能禁用synchronous_commit,可以稍微缓解此问题的成本,但是,除非在存储子系统上有非易失性回写缓存,否则它的速度仍然不会很快。

更重要的是,由于您正在进行第一次更新,因此无法区分正在工作的工作人员和已经崩溃的工人之间的区别。如果所有工作人员都在本地计算机上,偶尔可以扫描丢失的PID,但这很麻烦,容易出现竞争情况,您可以将该标记设置为工作人员的进程标识,更不用说使用pid重用的问题了。

我会建议你采用,旨在解决这些问题,喜欢的ActiveMQ,RabbitMQ的,ZeroMQ等PGQ也可能是显著关心的真正排队的解决方案。

在事务性关系数据库中执行队列处理应该是很容易,但在实践中,做得很好并且得到正确的结果是很荒唐的。当仔细检查时,大多数看起来合理的“解决方案”实际上是对所有工作进行序列化(因此,只有许多队列工作人员中的任何一个正在做任何事情)。

+0

是的,这种方法总是觉得像用锤子敲钉子 - 我一直想看看ZeroMQ很长一段时间,所以我会走这条路。感谢您的洞察! – Ross 2013-04-11 20:08:57

0

您可以使用SELECT ... FOR UPDATE NOWAIT这将获得行上的排他锁,或者报告错误(如果它已被锁定)。