2016-11-20 73 views
4

我已经阅读Jonathan Oliver关于处理乱序事件的好消息。在CQRS读取端处理乱序事件

http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/

,我们用的是出队的消息,并把它放在一个“支持台”,直到与前一个序列的所有消息都收到 的解决方案。当收到所有先前的消息时,我们将所有的 消息从保留表中取出,并依次通过 合适的处理程序运行。一旦所有处理程序都成功执行 ,我们将从保留表中删除消息,并提交 更新到读取模型。

这适用于我们,因为该域名发布事件并使用适当的序列号标记它们 。没有这个,下面的解决方案 会更困难 - 如果不是不可能的话。

该解决方案使用关系数据库作为持久性存储 机制,但我们没有使用存储引擎的任何关系方面。与此同时,所有这一切都有一个警告。 如果消息2,3和4到达但消息1从未做过,我们不会将它们中的任何一个应用于 。只有在处理消息1时出现错误 或者消息1以某种方式丢失时才会发生该情况。幸运的是, 很容易纠正我们的消息处理程序中的任何错误,并且 重新运行消息。或者,在丢失消息的情况下,直接从事件存储器重新构建 。

有几个问题,特别是他如何说我们总是可以向活动商店索要失踪事件。

  1. CQRS的写入端是否必须公开读取 端的服务以“请求”重放事件?例如,如果事件1不是收到的 ,而是2,3,4,我们是否可以通过 服务请求事件库重新发布从1开始的事件?
  2. 这个服务是CQRS写端的责任吗?
  3. 我们如何使用它重新构建读取模型?
+0

我们使用RabbitMq的“重试”方法,它工作正常。如果经过多次重试仍然无效 - 只需将该事件置于死信队列中并重置序列号,以便可以正确处理更多事件。通常在您的应用程序中乱序事件的原因是什么? – IlliakaillI

+0

我有一些会生成多个事件的特定命令。我还没有执行任何操作,但可能会出现乱序事件。我的活动发布者也是异步工作的。所以有可能某些事件可能无法按顺序发布。我依靠我的事件序列号来帮助我重新组合。 我会尝试重试方法。如果你可以详细说明一下,我可以将其标记为答案。 –

+0

我在答案的评论部分添加了更详细的解释。 – IlliakaillI

回答

1

如果您有一个序列号,那么您可以检测当前事件发生故障的情况,例如, currentEventNumber!= lastReceivedEventNumber + 1

一旦你发现了,你只是抛出一个异常。如果您的订阅者拥有“重试”机制,它将尝试在一秒钟内重新处理此事件。在这段时间内,有一个相当好的机会将会处理更早的事件并且顺序将是正确的。如果无序事件很少发生,这是一个解决方案。

如果您经常遇到这种情况,您需要实现全局锁定机制,这将允许某些事件按顺序处理。 例如,我们在MSSQL中使用sp_getapplock来实现某些情况下的全局“临界区”行为。当分布式应用程序的多个部分需要的不仅仅是一个简单的锁时,Apache ZooKeeper提供了一个处理更复杂场景的框架。

+0

我正在研究某些多人游戏如何处理这种情况。游戏内置高速缓存,持续大约100ms。在应用事件之前等待100ms,以防事件丢失。由于潜在的可伸缩性问题,我有点犹豫是否使用任何锁。顺便说一句,你如何向活动商店询问实施中的遗漏事件? –

+0

如果你想使这个系统健壮,你必须非常小心聚合端的各种缓存。如果你的应用程序突然失败会发生什么?根据我的经验,使用缓存事件的方法不会扩展。在我们的业务案例中,我们构建了分布式24/7容错服务器,这意味着您必须在单独的物理机器上至少有2个聚合器进程实例。如果你想避免裂脑情况 - 你必须考虑有3个独立的实例并行运行。 – IlliakaillI

+0

>>您如何在事件存储库中查找您的实施中遗漏的事件? 好吧,我们使用rabbitmq,后来改用天蓝色的服务总线。这两项服务都提供了交付保证的功能。你基本上知道队列服务,你的事件在聚合器端的事务结束时已经被成功处理。 – IlliakaillI

0

你在这里描述的是事件采购(ES)。通过命令模型将发出的事件存储到持久存储器 通过事件类型,命令模型ids(聚合根ID),命令模型类型(聚合根类型)重播存储的事件。有ES的优势。您甚至可以稍后重播这些事件以生成新类型的查询模型。 使用ES方法也可以用于具有UnitOfWork范围的应用程序事务。在提交时,发出的事件被持久化并分发给事件监听器(QM维护服务)。提交阶段的验证应该包含通过db中的序列号检查并发访问。

+0

不幸的是,这并不回答任何发布的问题。我之前已经使用ES实现了CQRS,但是这个问题是特定于乱序/丢失事件的上下文的。 –

+0

不合时宜的情况很容易通过延迟交货解决。如果您检测到超出序列信息,您将延迟交付。但是交付本身应该是一致的。你选择什么样的信息传递方式?至少一次,至少一次或者正好一次。消息传递由基础设施(RabbitMQ,Kafka)处理?公式,卡夫卡将确保订单主题交付给每个组。甚至可以持续消息定义的时间,因此消费者可以离线几个小时,消息将被传递。 – hellxcz

+0

我使用RabbitMQ,它保证“至少一次”交付。所以消息幂等性也必须在事件处理器上实现。 –