我想根据 SparkStreaming 中数据的值更改 Kafka 主题目标以保存数据。有可能再次这样做吗?当我尝试以下代码时,它只执行第一个,但不执行较低的进程。
(testdf
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
)
(testdf
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
)
数据存储在主题名称 test 中。谁能想到一种方法来做到这一点?
我更改了目的地以保存这样的数据框。
|type|value|
| A |testvalue|
| B |testvalue|
类型A到主题测试。类型 B 到主题 testB。