我正在尝试使用Alpakka Kafka 连接器 (Akka Stream Kafka)创建一个简单的原型。
运行应用程序时出现以下错误:
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
我有以下代码./src/main/scala/App.scala
:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
println("Hello from producer")
implicit val system = ActorSystem("fakeProducer")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config // ConfigFactory.load()
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val done: Future[Done] =
Source(1 to 100)
.map(_.toString)
.map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
.runWith(Producer.plainSink(producerSettings))
println("Done")
}
}
以下build.sbt
:
name := "test-akka-stream"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
我使用sbt run
. 我还没有配置任何 uber/assembly jar。
我可能遗漏了一些明显的东西,但我看不到它......我怀疑 akka 依赖项存在一些问题。
更新
正如@terminally-chill 调用ProducerSettings(system, new StringSerializer, new StringSerializer)
(传递ActorSystem
而不是配置)所建议的那样解决问题。我只是不明白这是设计使然还是错误。
更新 2
我创建了一个已经修复的github 问题。现在文档更准确,并解释了创建ProducerSettings
/的正确方法ConsumerSettings
。
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
或者你可以通过ActorSystem
上面的解释。