0
我有一个Java应用程序,应该从卡夫卡读取,做一些魔术并将数据发送给德鲁伊。从卡夫卡消耗并发送到德鲁伊问题
我有Kafka工作者线程(每个主题约15个),它们使用来自Kafka的数据并最终使用Tranquility将其发送给德鲁伊。
这是问题: 如果我使用一个线程 - 一切都很好。如果我与多个人合作,我会得到例外。
我想工作方式如下:
- 春德鲁伊的服务与多家宁静的对象。
- 没有Spring,只需为每个线程创建几个Tranquility对象。
我认为这可能是并发问题。
当我说“几个宁静”我的意思是我发送数据到不同的表。
我得到:
java.lang.AssertionError: assertion failed: List(package nio, package nio, package nio, package nio, package nio, package nio, package nio)
at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:44) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) ~[scala-reflect-2.10.5.jar!/:na]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$typecreator3$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$TypeTag$class.equals(TypeTags.scala:256) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$TypeTagImpl.equals(TypeTags.scala:291) ~[scala-reflect-2.10.5.jar!/:na]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:152) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:341) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:204) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:123) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams.fromConfig(DruidBeams.scala) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.cooladata.etl.rt.connector.DruidConnector.registerSender(DruidConnector.java:57)
at com.cooladata.etl.rt.util.DruidUtil.emmitEvent(DruidUtil.java:26)