2017-03-07 83 views
1

实现并行批量处理队列当我使用Informix SQL工作有可能使用简单的更新的事务隔离来创建一个处理队列是这样的:在Neo4j的Cypher支架

UPDATE Queue 
SET processed_by = ? 
WHERE processed_by IS NULL 
ORDER BY inserted_at 
LIMIT 3 

这标志着一批处理器在第一个参数(?,例如服务器名称和线程号)中最多处理3个项目(队列可能为空)。在此之后UPDATE完成每个并行处理器上(分离的线程或机器),该队列是分片:

+-----+------------------+---------------+ 
| _id | inserted_at | processed_by | 
+-----+------------------+---------------+ 
| 1 | 2017-03-07 01:15 | host2:thread3 | 
| 2 | 2017-03-07 01:16 | host2:thread3 | 
| 3 | 2017-03-07 01:17 | host2:thread3 | 
| 4 | 2017-03-07 01:18 | host1:thread1 | 
| 5 | 2017-03-07 01:19 | host1:thread1 | 
| 6 | 2017-03-07 01:20 | host1:thread1 | 
| 7 | 2017-03-07 01:21 | NULL   | 
+-----+------------------+---------------+ 

和每个处理器都可以在隔离自身工作对一批:

SELECT * from Queue WHERE processed_by = ? 

,而他们可以肯定没有人接触他们的物品来处理。作为一个副作用,如果一个处理器死了,他们很容易从他们离开的地方拿起。


我试图实现与Cypher支架同样的事情,但我遇到了问题时,并行读取遵守相同的状态,因此更新相同的项目:

MATCH (n:Queue) 
WHERE NOT exists(n.processed_by) 
SET n.processed_by = $processor 
RETURN n 
LIMIT 3 

建立在此我能够lock items并做处理与此:

MATCH (n:Queue) 
SET n._LOCK_ = true 
WITH n 
WHERE NOT exists(n.processed_by) 
SET n.processed_by = $processor 
REMOVE n._LOCK_ 
RETURN n 
LIMIT 3 

的问题是,在_LOCK_适用于所有队列的节点,而不是仅仅3如果它们中有很多,可能会导致性能下降。如果有时一个处理器会锁定所有可用项目,那么它将不会成为问题,因为其他处理器将锁定所有可用项目。

用Cypher表达这个问题的正确方法是什么?
如果可能,我想还包括订购(请参阅SQL中的inserted_at)。
我也很乐意让它在没有限制的情况下工作,所以只需标记单个项目而不会发生冲突。

注意:我希望这可以不使用任何服务器端扩展。

回答

1

[增订]

与第一查询的问题是,如果另一线程/进程已经有一个节点上的锁(通过设置节点属性;在这种情况下,processed_by),试图设置一个只在同一节点上写锁定暂时阻止您的线程/进程代码继续进行。一旦其他线程/进程完成其处理,其写入锁定将被释放,并且您的代码将继续设置processed_by--它将覆盖其他线程/进程先前写入的内容!

如您所知,第二个查询的问题是它会锁定所有Queue节点,这会阻止您使用多个线程/进程获得的任何性能改进。

这可能会为你工作:

MATCH (n:Queue) 
WHERE NOT EXISTS(n.processed_by) 
WITH n LIMIT 3 
SET n._LOCK_ = true 
WITH COLLECT(n) AS lockedNodes 
WITH REDUCE(s = [], n IN lockedNodes | 
    CASE WHEN NOT EXISTS(n.processed_by) THEN s + n ELSE s END) AS nodesToSet, lockedNodes 
FOREACH(x IN nodesToSet | SET x.processed_by = $processor) 
FOREACH(x IN lockedNodes | REMOVE x._LOCK_) 
RETURN nodesToSet, lockedNodes; 

该查询首先获取最多3个节点,而不processed_by财产,并试图(通过设置其属性_LOCK_)来设置他们每个人写锁。这可以避免锁定所有Queue节点。

如果其他线程/进程已经在一个或多个相同节点上有写锁定,则线程/进程将被阻塞,直到释放这些锁。在线程/进程获取所有写入锁之后,某些其他节点上的processed_by属性可能已由其他一些线程/进程设置。因此,此查询将第二次测试processed_by属性的存在性,并且仅在它不存在时才设置它。子句RETURN不仅返回您更改的节点的集合,还返回最初找到的和锁定的节点(可能比您更改的节点的集合更大)的集合。

+1

您可能希望使用WITH WITH DISTINCT 1 AS忽略,因为前一次匹配的基数将保持不变。最好把它弄到一排。 – InverseFalcon

+0

谢谢,稍后当我到达我的电脑后再尝试。我到目前为止唯一的问题(仅基于阅读本文)是为什么我们需要创建队列节点?排队的项目来自外部来源,由其他内容插入,包含要处理的数据/关系。处理器不应该创建它们,因此SQL示例中的'UPDATE'语句并没有看到'INSERT INTO'。注意:为了强调,如果没有两个查询同时运行,我的第一个Cypher示例(使用'SET')似乎可以在很大的来源上工作,但我无法保证。 (通知:@InverseFalcon) – TWiStErRob

+0

我完全改变了我的答案,因为我最初并不理解你的问题。我还没有测试过新的答案,但对我来说似乎是合理的:-)。 – cybersam

0

如果锁定队列节点的唯一原因是用于写入processed_by属性,则可以考虑锁定指定为:QueueLock的其他节点。它的唯一目的是被锁定,所以你可以执行你的队列操作。

这当然需要所有队列操作(用于标记为processed_by,以及可能影响该操作的任何内容)来锁定:QueueLock。

您还需要在SET操作之前限制节点,因为在当前查询中,您只限制节点的返回(它返回3,但在此之前它将所有未分配的节点分配给单个节点处理器)。

MATCH (lock:QueueLock) 
SET lock._LOCK_ = true 
MATCH (n:Queue) 
WITH n 
WHERE NOT exists(n.processed_by) 
LIMIT 3 
SET n.processed_by = $processor 
REMOVE lock._LOCK_ 
RETURN n 

但是,它可能有助于在节点上使用不同的标签,因此分配的节点上的操作不必等待锁定。

如果你继续:队列排队,未分配节点(所以processed_by永远不会存在,而节点在队列中),以及:分配给指定的节点进行处理,这应该为你工作:

MATCH (lock:QueueLock) 
SET lock._LOCK_ = true 
MATCH (n:Queue) 
WITH n 
LIMIT 3 
SET n.processed_by = $processor 
REMOVE n:Queue 
SET n:Assigned 
REMOVE lock._LOCK_ 
RETURN n 
+0

嗯,改变节点标签看起来很有趣。我会看到它可以与Neo4J-OGM一起工作。 – TWiStErRob

+0

它理论上看起来不错,但我无法让它工作。我得到一个错误,说'SET'和'MATCH'之间需要'WITH',所以我更新编译并运行:MATCH(lock:QueueLock) SET lock._LOCK_ = true WITH lock MATCH(n:队列) WITH lock,n LIMIT 3 ...',但行为很奇怪:每个处理器都处理一个批处理,但是原始队列中的第一个项目被重新处理。这是与标签分配版本,第一个版本表现更糟糕。 – TWiStErRob

+0

良好的捕获,我错过了那些。我必须捅一下,看看造成这种奇怪行为的原因。 – InverseFalcon

0

@cybersamsolution的另一变型我作为我最后的代码(包括排序):根据我的测试

// find all of the queued items 
MATCH (n:Queue) 
    // that haven't been marked yet 
    WHERE NOT exists(n.processed_by) 
// take the first $count of those and lock them 
WITH n 
    ORDER BY n.something ASCENDING 
    LIMIT $count 
    SET n._LOCK_ = true 
    // write lock will be released after RETURN "commits", but clean up sooner 
    REMOVE n._LOCK_ 
// after locking check if they're still unmarked 
// (between the first check and locking they could have been marked by others) 
WITH n 
    WHERE NOT exists(n.processed_by) 
    // and mark these unmarked ones as ours (count(n) <= than original limit) 
    SET n.processed_by = $processor, n.processing_started = timestamp() 
RETURN n; 

的行为是一样的,但也没必要势在必行Cypher支架。

我相信通过对变量名称(WITH n as x)的某些操作,也可以得到“锁定但未标记”的节点,但我并不需要这些实现细节作为查询的结果。