[注意]问题是Lagom框架的具体问题!Lagom PubSubRef订阅者删除消息
在我目前的项目中,当上流速度很高并且看起来下游无法及时处理所有消息时,观察到将来自Source的消息列表剪切成Kafka主题发布者的问题已被观察到。由于意识到,切割有关PubSubRef.subscribe()方法https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl/pubsub/PubSubRef.scala#L85
行为的完整的方法定义:
def subscriber(): Source[T, NotUsed] = {
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead)
.mapMaterializedValue { ref =>
mediator ! Subscribe(topic.name, ref)
NotUsed
}.asJava
}
有OverflowStrategy.dropHead使用。是否可以更改为使用back-pressure strategy?
UPD#1: 用例是非常简单的,当一个查询请求被发布到命令主题,把它和查询从数据库表对象,结果列表被推入结果卡夫卡的话题。代码段:
objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC));
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
Flow.fromFunction(this::deserializeCommandAndQueryObjects)
.mapAsync(concurrency, objects -> objects)
.flatMapMerge(concurrency, objects -> objects)
.alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object)))
.to(objectsResultTopic.publisher()),
Source.repeat(Done.getInstance())
)
)
在对象的情况下,流生成由deserializeCommandAndQueryObjects
函数大于默认缓冲器大小= 1000它启动切割元件(我们的情况下是〜2.5K对象)。
UPD#2: 对象数据的来源是:
// returns CompletionStage<Source<CustomObject, ?>>
jdbcSession.withConnection(
connection -> Source.from(runQuery(connection, rowConverter))
)
而且还有一个订阅卡夫卡objectsResultTopic
:
TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> {
JsonNode node = mapper.convertValue(gm, JsonNode.class);
return Pair.create(node, offset);
}));
发布到'objectQueryTopic'中的数据的来源是什么?什么是订阅'objectsResultTopic'重申,你在这里使用的'objectsResultTopic'的API并不使用Kafka。 –
我可能会被一堆代码片段混淆,但主要想法是获取PubSubRef'ObjectsResultTopic'并订阅Kafka topic = OBJECTS_RESULT_TOPIC,并将通过流加载的数据从DB源加载到objectsResultTopic.publisher()中以发布它们在结果主题。 – VRomaN
对于StackOverflow问答,这可能会有点复杂,是的:)我认为底线是'PubSubRef'不是这项工作的最佳工具。这听起来像是你想要从卡夫卡主题中读取数据,转换数据,然后将结果写入另一个主题。那是对的吗? –