我有一个 Java 应用程序,它应该从 Kafka 读取数据,做一些魔术并将数据发送给 Druid。
我有 Kafka 工作线程(每个主题大约 15 个),它们使用来自 Kafka 的数据并最终使用 Tranquillity 将其发送给 Druid。
这就是问题所在:如果我使用一个线程 - 一切都很好。如果我与多个人一起工作,我会遇到异常。
我尝试按以下方式工作:
- 带有几个 Tranquility 对象的 Spring Druid 服务。
- 没有 Spring,只需为每个线程创建几个 Tranquility 对象。
我认为这可能是并发问题。
当我说“几个宁静”时,我的意思是我将数据发送到不同的表。
我得到:
java.lang.AssertionError: assertion failed: List(package nio, package nio, package nio, package nio, package nio, package nio, package nio)
at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:44) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) ~[scala-reflect-2.10.5.jar!/:na]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$typecreator3$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$TypeTag$class.equals(TypeTags.scala:256) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$TypeTagImpl.equals(TypeTags.scala:291) ~[scala-reflect-2.10.5.jar!/:na]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:152) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:341) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:204) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:123) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams.fromConfig(DruidBeams.scala) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.cooladata.etl.rt.connector.DruidConnector.registerSender(DruidConnector.java:57)
at com.cooladata.etl.rt.util.DruidUtil.emmitEvent(DruidUtil.java:26)