1

我想根据 SparkStreaming 中数据的值更改 Kafka 主题目标以保存数据。有可能再次这样做吗?当我尝试以下代码时,它只执行第一个,但不执行较低的进程。

(testdf 
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
      )
            
(testdf 
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
      )

数据存储在主题名称 test 中。谁能想到一种方法来做到这一点?

我更改了目的地以保存这样的数据框。

|type|value|
| A  |testvalue|
| B  |testvalue|

类型A到主题测试。类型 B 到主题 testB。

4

2 回答 2

1

使用最新版本的 Spark,您只需topic在数据框中创建一个列,用于将记录定向到相应的主题。

在你的情况下,这意味着你可以做类似的事情

testdf 
  .withColumn("topic", when(f.col("value") == "A", lit("test")).otherwise(lit("testB"))
  .selectExpr("CAST(value as STRING) as value", "topic") 
  .writeStream .format("kafka") 
  .option("checkpointLocation", "/checkpoint_1") 
  .option("kafka.bootstrap.servers","~~:9092")
  .start()
于 2021-03-05T03:24:44.713 回答
0

谢谢迈克。我能够通过运行以下代码来实现这一点!

(
testdf 
  .withColumn("topic",f.when(f.col("testTime") == "A", f.lit("test")).otherwise(("testB")))
  .selectExpr("CAST(value as STRING) as value", "topic") 
  .writeStream
  .format("kafka") 
  .option("checkpointLocation", "/checkpoint_2") 
  .option("startingOffsets", "latest")
  .option("kafka.bootstrap.servers","9092")
  .start()
)
于 2021-03-05T07:55:31.630 回答