如果我的 Kafka 主题收到类似的记录
CHANNEL | VIEWERS | .....
ABC | 100 | .....
CBS | 200 | .....
我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:
val spark = SparkSession
.builder
.appName("TestPartition")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
.option("subscribe", "partition_test")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
// I will use a custom UDF to transform to a specific object
目前,我使用 foreachwriter 处理记录如下:
val writer = new ForeachWriter[testRec] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: testRec) = {
handle(record)
}
def close(errorOrNull: Throwable): Unit = {
}
}
val query = dataFrame.writeStream
.format("console")
.foreach(writer)
.outputMode("append")
.start()
代码工作得很好。但是,我想做的是按通道对传入数据进行分区,以便每个工作人员负责特定的通道,并且我在 handle() 块内进行与该通道相关的内存计算。那可能吗 ?如果是,我该怎么做?