0

在 kafka 中,我动态地获取新主题,我必须使用来自特定偏移量的火花流来处理它。是否有可能从变量传递 json 值。例如考虑下面的代码

val df = spark
 .read
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribePattern", "topic.*")
 .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
 .load()

在这个我想动态更新startingOffsets的值......我试图传递字符串中的值并调用它但它没有工作......如果我在startingOffsets中给出相同的值它正在工作。在这种情况下如何使用变量?

val start_offset= """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}"""
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", start_offset)
  .load()
java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}"""
4

2 回答 2

0
def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[*]").setAppName("ReadSpecificOffsetFromKafka");
    val spark = SparkSession.builder().config(conf).getOrCreate();
    spark.sparkContext.setLogLevel("error");
    import spark.implicits._;

    val start_offset = """{"first_topic" : {"0" : 15, "1": -2, "2": 6}}"""
    val fromKafka = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092, localhost:9093")
      .option("subscribe", "first_topic")
//      .option("startingOffsets", "earliest")
      .option("startingOffsets", start_offset)
      .load();

    val selectedValues = fromKafka.selectExpr("cast(value as string)", "cast(partition as integer)");

    selectedValues.writeStream
      .format("console")
      .outputMode("append")
//      .trigger(Trigger.Continuous("3 seconds"))
      .start()
      .awaitTermination();
  }

这是使用 spark 结构化流和 scala 从 kafka 获取特定偏移量的确切代码

于 2020-10-03T12:27:11.860 回答
-1
  • 看起来您的工作是检查将 Kafka 偏移量指向一些持久存储。试着清理那些。并重新运行您的作业。
  • 还可以尝试重命名您的作业并运行它。

Spark 可以通过readStream. 因此,请尝试使用错误消息中显示的偏移量来消除错误。

spark
  .readStream
  .format("kafka")
.option("subscribePattern", "topic.*")
于 2020-06-03T13:29:14.770 回答