5

我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序。我正在使用以下内容:

  • Flink 0.9
  • 斯卡拉 2.10.4
  • 卡夫卡 0.8.2.1

我按照这里这里的描述按照文档测试了 KafkaSource(添加了依赖项,将 Kafka 连接器 flink-connector-kafka 捆绑在插件中)。

下面是我的简单测试程序:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

但是,编译总是抱怨 KafkaSource not found:

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))

我在这里想念什么?

4

3 回答 3

3

我是 sbt 用户,所以我使用了以下内容build.sbt

organization := "pl.japila.kafka"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"

这让我可以运行程序:

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

输出:

[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
于 2015-07-15T07:32:01.810 回答
1

问题似乎是 SBT 和 Maven 配置文件不能很好地协同工作。

Flink POM 将 Scala 版本(2.10、2.11、...)称为变量,其中一些在构建配置文件中定义。SBT 未正确评估配置文件,因此包装无法正常工作。

有一个问题和待处理的拉取请求来解决这个问题:https ://issues.apache.org/jira/browse/FLINK-2408

于 2015-08-10T14:06:29.093 回答
0
object FlinkKafkaStreaming {
    def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
   // only required for Kafka 0.8
   properties.setProperty("zookeeper.connect", "localhost:2181")
   properties.setProperty("group.id", "flink-kafka")
   val stream = env.addSource(new FlinkKafkaConsumer08[String] 
  ("your_topic_name",new SimpleStringSchema(), properties))   
  stream.setParallelism(1).writeAsText("your_local_dir_path")
  env.execute("XDFlinkKafkaStreaming")
  }
}

为了测试,您可以执行以下操作:

  1. 先运行flink demo;
  2. 运行 Kafka_Proudcer;

于 2018-12-27T07:01:41.147 回答