11

我遇到了带 Spark 的结构化流,它有一个从 S3 存储桶持续消费并将处理后的结果写入 MySQL 数据库的示例。

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")

这如何与Spark Kafka Streaming一起使用?

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

有没有办法在不使用的情况下结合这两个例子stream.foreachRDD(rdd => {})

4

2 回答 2

12

有没有办法在不使用的情况下结合这两个例子 stream.foreachRDD(rdd => {})

还没有。Spark 2.0.0 不支持结构化流的 Kafka 接收器。根据Spark Streaming 的创建者之一 Tathagata Das的说法,这是 Spark 2.1.0 中应该出现的一个功能。这是相关的 JIRA 问题

编辑:(29/11/2018)

是的,Spark 2.2 版以后可以。

stream
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

查看此SO 帖子(使用 Spark 流读取和写入 Kafka 主题)了解更多信息。

编辑:(2016 年 6 月 12 日)

Spark 2.0.2现在对结构化流的 Kafka 0.10 集成进行了expiramentaly 支持

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
于 2016-09-01T17:48:53.187 回答
3

从 Kafka 源读取并写入 Cassandra 接收器时,我遇到了类似的问题。在这里创建了一个简单的项目kafka2spark2cassandra,以防万一它对任何人都有帮助。

于 2017-01-05T20:25:52.270 回答