2016-12-06 126 views
0

我有一个Java应用程序,应该从卡夫卡读取,做一些魔术并将数据发送给德鲁伊。从卡夫卡消耗并发送到德鲁伊问题

我有Kafka工作者线程(每个主题约15个),它们使用来自Kafka的数据并最终使用Tranquility将其发送给德鲁伊。

这是问题: 如果我使用一个线程 - 一切都很好。如果我与多个人合作,我会得到例外。

我想工作方式如下:

  1. 春德鲁伊的服务与多家宁静的对象。
  2. 没有Spring,只需为每个线程创建几个Tranquility对象。

我认为这可能是并发问题。

当我说“几个宁静”我的意思是我发送数据到不同的表。

我得到:

java.lang.AssertionError: assertion failed: List(package nio, package nio, package nio, package nio, package nio, package nio, package nio) 
at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:44) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) ~[scala-reflect-2.10.5.jar!/:na] 
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$typecreator3$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2] 
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.api.TypeTags$TypeTag$class.equals(TypeTags.scala:256) ~[scala-reflect-2.10.5.jar!/:na] 
at scala.reflect.api.TypeTags$TypeTagImpl.equals(TypeTags.scala:291) ~[scala-reflect-2.10.5.jar!/:na] 
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2] 
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:152) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2] 
at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:341) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2] 
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:204) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2] 
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:123) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2] 
at com.metamx.tranquility.druid.DruidBeams.fromConfig(DruidBeams.scala) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2] 
at com.cooladata.etl.rt.connector.DruidConnector.registerSender(DruidConnector.java:57) 
at com.cooladata.etl.rt.util.DruidUtil.emmitEvent(DruidUtil.java:26) 

回答

0

什么,我会建议正在采取其他付费。首先,你应该知道,宁静会让某些特定时间段的事件比较老旧,因为我记得默认情况下有一个小时。如果你只关心最近的事件,那就没关系。 既然你已经有了Kafka,我建议你将处理过的数据回推给Kafka,并使用宁静或Kafka indexer service来发送数据给德鲁伊。这种方法的好处是,不仅德鲁伊而且其他服务可以使用这些数据。此外,与宁静卡夫卡索引服务不会下降任何事件。