我有一个很重的用户数据流。我想通过它的id来确定这是否是新用户。为了减少对数据库的调用,我宁愿在先前用户的内存中维护一个状态。 val users = mutable.set[String]()
//init the state from db
user = db.getAllUsersIds()
val source: Source[User, NotUsed]
val dbSink:
我试图通过设置Kafka服务器并使用生产者发送消息来在本地测试我的代码,但我想知道是否有一种方法可以为此编写单元测试一段代码(测试消费者收到的消息是否正确)。 val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootst
我是Akka/Scala的新手,正在尝试调试下面的代码。当resultSetParser有一个例外时,它不会抛出它。相反,使用此代码的服务只是永远闲置。 如何让我的服务抛出异常,而不仅仅是在流中等待?在Akka中有没有类似watchException()的函数,我可以在watchTermination()之后调用它,使它在处理流时看到异常? val chunkSource: Source[Chun