2017-07-03 99 views
0

[注意]问题是Lagom框架的具体问题!Lagom PubSubRef订阅者删除消息

在我目前的项目中,当上流速度很高并且看起来下游无法及时处理所有消息时,观察到将来自Source的消息列表剪切成Kafka主题发布者的问题已被观察到。由于意识到,切割有关PubSubRef.subscribe()方法https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl/pubsub/PubSubRef.scala#L85

行为的完整的方法定义:

def subscriber(): Source[T, NotUsed] = { 
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead) 
    .mapMaterializedValue { ref => 
    mediator ! Subscribe(topic.name, ref) 
    NotUsed 
    }.asJava 
} 

OverflowStrategy.dropHead使用。是否可以更改为使用back-pressure strategy

UPD#1: 用例是非常简单的,当一个查询请求被发布到命令主题,把它和查询从数据库表对象,结果列表被推入结果卡夫卡的话题。代码段:

objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC)); 
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects) 
     .mapAsync(concurrency, objects -> objects) 
     .flatMapMerge(concurrency, objects -> objects) 
     .alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object))) 
     .to(objectsResultTopic.publisher()), 
    Source.repeat(Done.getInstance()) 
    ) 
) 

在对象的情况下,流生成由deserializeCommandAndQueryObjects函数大于默认缓冲器大小= 1000它启动切割元件(我们的情况下是〜2.5K对象)。

UPD#2: 对象数据的来源是:

// returns CompletionStage<Source<CustomObject, ?>> 
jdbcSession.withConnection(
    connection -> Source.from(runQuery(connection, rowConverter)) 
) 

而且还有一个订阅卡夫卡objectsResultTopic

TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> { 
    JsonNode node = mapper.convertValue(gm, JsonNode.class); 
    return Pair.create(node, offset); 
})); 
+0

发布到'objectQueryTopic'中的数据的来源是什么?什么是订阅'objectsResultTopic'重申,你在这里使用的'objectsResultTopic'的API并不使用Kafka。 –

+0

我可能会被一堆代码片段混淆,但主要想法是获取PubSubRef'ObjectsResultTopic'并订阅Kafka topic = OBJECTS_RESULT_TOPIC,并将通过流加载的数据从DB源加载到objectsResultTopic.publisher()中以发布它们在结果主题。 – VRomaN

+0

对于StackOverflow问答,这可能会有点复杂,是的:)我认为底线是'PubSubRef'不是这项工作的最佳工具。这听起来像是你想要从卡夫卡主题中读取数据,转换数据,然后将结果写入另一个主题。那是对的吗? –

回答

1

如果有人有兴趣,最后我们解决了这个问题,通过使用阿卡制片API,如:

ProducerSettings<String, CustomObject> producerSettings = ProducerSettings.create(system, new StringSerializer(), new CustomObjectSerializer()); 
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects) 
     .mapAsync(concurrency, objects -> objects) 
     .flatMapMerge(concurrency, objects -> objects) 
     .alsoTo(Sink.foreach(object -> LOG.trace("Sending event {}", object))) 
     .map(object -> new ProducerRecord<String, CustomObject>(OBJECTS_RESULT_TOPIC, object)) 
     .to(Producer.plainSink(producerSettings)), 
    Source.repeat(Done.getInstance()))); 

它的工作原理没有缓冲,只是使推到卡夫卡的话题。

+0

太好了!在阅读之前我发布了我的最后一条评论,表明您解决了这个问题这看起来像是一个很好的解决方案。 –

3

这听起来像Lagom的distributed publish-subscribe功能可能不你有工作的最佳工具。

你的问题提到卡夫卡,但这个功能并没有利用卡夫卡。相反,它通过直接向群集中的所有用户广播消息来工作。这是一次“最多一次”的消息传递,它可能确实会丢失消息,并且针对那些关注最新消息而不是处理每一个消息的消费者。溢出策略不是可定制的,您不希望在这些用例中使用背压,因为这意味着一个缓慢的用户可能会减慢向所有其他用户的交付。

有,你有一些其他选项:

  1. 如果你想使用卡夫卡,你应该使用Lagom的message broker API。这支持“至少一次”的传递语义,并且可以用来确保每个消费者处理每个消息(代价是可能增加延迟)。在这种情况下,卡夫卡扮演着一个巨大的持久缓冲区,所以它甚至比背压更好:生产者和消费者可以以不同的速度前进,并且(当与partitioning一起使用时),您可以添加消费者以便扩展在需要时更快速地处理消息。

    当生产者和消费者都在同一个服务中时,可以使用消息代理API,但它特别适用于服务之间的通信。

  2. 如果您发送的消息是持久性实体事件,并且消费者是同一服务的一部分,那么persistent read-side processor可能是一个不错的选择。

    这也提供了“至少一次”发货,而如果处理消息的唯一影响是数据库更新,那么内置的Cassandra read-side databasesrelational read-side databases支持提供“有效的一次”语义,在数据库更新事务运行以确保在事件处理期间发生的故障不会导致部分更新。

  3. 如果您发送的消息是持久性实体事件,消费者是同一服务的一部分,但您希望将事件作为流处理,则可以访问raw stream of events

  4. 如果您的用例不符合Lagom明确支持的用例之一,则可以使用更低级别的Akka API(包括distributed publish-subscribe)来实现更适合您需求的某些内容。

最好的选择将取决于你的用例的具体情况:消息的来源和你想要的消费者类型。如果您通过更多详细信息更新您的问题并为此答案添加评论,我可以使用更具体的建议来编辑答案。

+0

你好!感谢您的回复,我更新了问题并添加了受此问题影响的代码段中的实际用例。 – VRomaN