2

我正在尝试使用Alpakka Kafka 连接器 (Akka Stream Kafka)创建一个简单的原型。

运行应用程序时出现以下错误:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'

我有以下代码./src/main/scala/App.scala

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from producer")

    implicit val system = ActorSystem("fakeProducer")
    implicit val materializer: Materializer = ActorMaterializer()

    val config = system.settings.config // ConfigFactory.load()

    val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

    val done: Future[Done] =
      Source(1 to 100)
        .map(_.toString)
        .map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
        .runWith(Producer.plainSink(producerSettings))


    println("Done")
  }
}

以下build.sbt

name := "test-akka-stream"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

我使用sbt run. 我还没有配置任何 uber/assembly jar。

我可能遗漏了一些明显的东西,但我看不到它......我怀疑 akka 依赖项存在一些问题。

更新

正如@terminally-chill 调用ProducerSettings(system, new StringSerializer, new StringSerializer)(传递ActorSystem而不是配置)所建议的那样解决问题。我只是不明白这是设计使然还是错误。

更新 2

我创建了一个已经修复的github 问题。现在文档更准确,并解释了创建ProducerSettings/的正确方法ConsumerSettings

val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

或者你可以通过ActorSystem上面的解释。

4

4 回答 4

2

感谢@terminally-chill 和@murray-todd-williams 的回答。我做了一些进一步的研究,我在这里尝试总结一下:

两者ConsumerSettingsProducerSettings都有apply一个Config(见这里)或一个ActorSystem(见这里)的函数。

问题是使用ActorSystem代码时是:

val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload

而使用Config代码时是:

val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))

因此,当直接传递配置时,代码会搜索kafka-clients属性,而不是在传递ActorSystem代码时检查akka.kafka.consumer/akka.kafka.producer.

最后考虑,在默认情况下创建ActorSystem实例时,大多数设置都是从​​嵌入reference.conf文件加载的,并与您的application.conf文件(如果存在)合并。更多信息在这里。所以基本上唯一需要设置的属性通常是bootstrap.servers.

所以你现在可以理解为什么使用system.settings.config代码时不起作用了。此配置实例已加载reference.conf(所有默认值,请参见此处)和自定义application.conf。属性在kafka-clients里面akka.kafka.consumer/akka.kafka.producer,但代码直接检查kafka-clients

一些可能的解决方案:

  • 直接通过ActorSystemusing other 重载
  • 使用正确的部分system.settings.config.getConfig("akka.kafka.consumer")
  • 使用部分手动构造Config实例kafka-clients

对我而言,问题在于此处提供的官方文档没有提及这些差异,并且提供的示例不完整和/或不准确。对于 Akka 专家来说,这可能很清楚,但对于新开发人员来说,这可能会非常混乱。

我在这个要点中创建了一个更“准备使用”的示例并打开一个问题

于 2018-06-20T08:16:31.257 回答
2

感谢您注意到并提交 Alpakka Kafka 连接器项目中的问题。文档现已更新:https ://doc.akka.io/docs/akka-stream-kafka/current/producer.html

于 2018-06-20T14:29:26.567 回答
1

通常我将所有配置保存在 AkkaSystem 实例中。我不将 Alpakka 用于 Kafka,但我的许多实现都基于源代码。

加载类型安全的配置对象,然后val config = ConfigFactory.load()传入config.val system = ActorSystem("fakeProducer", config)

最后,传递system.settings.configProducerSettings.

您当前的代码没有传递任何设置,因为您还没有将配置加载到您的 Akka 系统中。您val config = system.settings.config正在引用一个空配置,它没有 kafka-clients 部分(最佳猜测)。

于 2018-06-19T11:56:58.283 回答
1

我想我遇到了同样的问题(几乎在同一时间),尽管我正在尝试创建一个基本的“hello world”Kafka 消费者而不是生产者。我猜您只是在浏览Alpakka Kafka 连接器文档中的文档,并按照他们第一次定义的示例进行操作

val config = system.settings.config

然后将其传递给新的 ConsumerSettings 对象。我猜测在线文档存在缺陷,但我对 Akka Streams 足够陌生(这是我第一次尝试通过示例学习),我没有资格确切地弄清楚什么是对或错。

我曾尝试创建和 application.conf 文件,然后执行 ConfigFactory.load(),然后在创建时手动将其传递给 ActorSystem,然后我将该系统传递给 ConsumerSettings 构造函数,以及关于缺少“kafka”的错误-clients”消失了,但显然我什至不必这样做。正如您所说,只需传递“系统”变量而不是“配置”变量就可以了。

希望这可以帮助任何在黑暗中摸索的人。我不得不说,对于 Akka Streams 周围的所有嗡嗡声,似乎真的缺乏文档。一旦我弄清楚了这些东西,我可能不得不写一篇博客文章!

于 2018-06-19T20:10:29.143 回答