2017-02-21 77 views
0

我试图实现一个使用Lagom 1.2.2设置的消息代理,并且碰到了墙壁。该文件对服务描述符下面的例子:Lagom消息代理的完整示例

default Descriptor descriptor() { 
return named("helloservice").withCalls(...) 
    // here we declare the topic(s) this service will publish to 
    .publishing(
    topic("greetings", this::greetingsTopic) 
) 
    ....; 
} 

而这个例子的实现:

public Topic<GreetingMessage> greetingsTopic() { 
return TopicProducer.singleStreamWithOffset(offset -> { 
    return persistentEntityRegistry 
     .eventStream(HelloEventTag.INSTANCE, offset) 
     .map(this::convertEvent); 
    }); 
} 

然而,有什么样的参数类型或返回convertEvent()函数的类型是没有例子,这是我画空白的地方。在另一端,在用户向MessageBroker,似乎它的消费GreetingMessage对象,但是当我创建了一个功能convertEvent返回GreetingMessage对象,我得到一个编译错误:

Error:(61, 21) java: method map in class akka.stream.javadsl.Source<Out,Mat> cannot be applied to given types; 
    required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 
    found: this::convertEvent 
    reason: cannot infer type-variable(s) T 
    (argument mismatch; invalid method reference 
    incompatible types: akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset> cannot be converted to com.example.GreetingMessage) 

是否有其他更彻底如何使用这个例子?我已经在Chirper示例应用程序中进行了检查,但似乎没有这方面的示例。

谢谢!

回答

3

您粘贴的错误消息告诉你到底是什么map预计:

required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 

所以,你需要通过一个函数,它Pair<GreetingEvent, Offset>。函数应该返回什么?那么,更新它来获取它,然后你会得到下一个错误,它会再次告诉你它期望你返回什么,在这种情况下,你会发现它是Pair<GreetingMessage, Offset>

要解释这些类型 - Lagom需要跟踪的事件已发布到卡夫卡,以便当您重新启动服务,它不会从您的事件日志的开头开始并重新从所有事件开始的时间再次。它通过使用偏移来实现这一点。因此,事件日志会生成事件和偏移量对,然后您需要将这些事件转换为将发布到Kafka的消息,并且当您将转换后的消息返回给Lagom时,它需要与偏移量成对您从事件日志中获得的信息,以便在向Kafka发布后,Lagom可以保留偏移量,并在下次重新启动服务时将其用作起始点。

完整的例子可以看这里:https://github.com/lagom/online-auction-java/blob/a32e696/bidding-impl/src/main/java/com/example/auction/bidding/impl/BiddingServiceImpl.java#L91

+0

感谢;我一直在仔细研究你提供的拍卖示例应用程序,它是有帮助的(尽管它使用'taggedStreamWithOffset'而不是'singleStreamWithOffset'(说实话,可能是我想要的) 部分I失踪的是'convertEvent'的参数;出于某些原因,我假设我搞乱了返回类型,即使当我的返回类型正确并导致我误入歧途时也是如此。 再次感谢您的帮助! –