0

我有一个 Java 应用程序,它应该从 Kafka 读取数据,做一些魔术并将数据发送给 Druid。

我有 Kafka 工作线程(每个主题大约 15 个),它们使用来自 Kafka 的数据并最终使用 Tranquillity 将其发送给 Druid。

这就是问题所在:如果我使用一个线程 - 一切都很好。如果我与多个人一起工作,我会遇到异常。

我尝试按以下方式工作:

  1. 带有几个 Tranquility 对象的 Spring Druid 服务。
  2. 没有 Spring,只需为每个线程创建几个 Tranquility 对象。

我认为这可能是并发问题。

当我说“几个宁静”时,我的意思是我将数据发送到不同的表。

我得到:

java.lang.AssertionError: assertion failed: List(package nio, package nio, package nio, package nio, package nio, package nio, package nio)
at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:44) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) ~[scala-reflect-2.10.5.jar!/:na]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$typecreator3$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$TypeTag$class.equals(TypeTags.scala:256) ~[scala-reflect-2.10.5.jar!/:na]
at scala.reflect.api.TypeTags$TypeTagImpl.equals(TypeTags.scala:291) ~[scala-reflect-2.10.5.jar!/:na]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:166) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1.apply(DruidBeams.scala:152) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:341) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:204) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:123) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.metamx.tranquility.druid.DruidBeams.fromConfig(DruidBeams.scala) ~[tranquility-core_2.10-0.8.2.jar!/:0.8.2]
at com.cooladata.etl.rt.connector.DruidConnector.registerSender(DruidConnector.java:57) 
at com.cooladata.etl.rt.util.DruidUtil.emmitEvent(DruidUtil.java:26) 
4

1 回答 1

0

我会建议采取其他报酬。首先,您应该知道,宁静会丢弃超过某个指定时间段的事件,因为我记得默认为一小时。如果您只关心最近的事件,那没关系。由于您已经拥有 Kafka,我建议您将处理后的数据推送回 Kafka,并使用 Tranquility 或Kafka 索引器服务将数据发送到 Druid。这种方法的好处是不仅 Druid,其他服务也可以使用这些数据。此外,与 Tranquility 相比,Kafka 索引器服务不会丢弃任何事件。

于 2017-06-19T06:08:04.623 回答