对于我的 spark 2.1.1 和 Kafka 0.10.2.1 结构化流示例,我能够通过接收foreach
器工作。我的流源配置为每 10 秒推送 2 条消息。
我看到前几条消息通过接收foreach
器(打开 - 过程 - 关闭)构造很好。但是,在第一次推送之后,该进程不再从队列中读取?
有任何想法吗 ?
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka10.broker")
.option("subscribe", src_topic)
.load()
val df = stream.selectExpr("cast (value as string) as json")
.select(functions.from_json(StringToColumn(StringContext.apply("json")).$(), txnSchema).as("data"))
.select("data.*")
编写器实现:
val writer = new ForeachWriter[Row] {
/*
Prepare Hbase connection
Prepare Kafka Producer
*/
true
}
override def process(row: Row) = {
try {
/*
Do biz logic
get data from hbase
at the end, write to a kafka queue
*/
}
catch {
case tt: Throwable => {
rlog("Something else wierd happened.")
}
}
}
override def close(errorOrNull: Throwable) = {
println("-------------------------------In close now. checking whether it was called due to soem error")
if(errorOrNull != null)
errorOrNull .printStackTrace()
println("-------------------------------Closing hbase Connection")
//closing hbase connections
println("-------------------------------Closing Kafka connection now")
// closing kafka producer object
// Other cleanup
println("-------------------------------All done. Exiting now.")
}
}
在我的业务逻辑处理过程中,我需要将数据从 HBase 转换为数据帧以进行进一步处理。我这样做的尝试失败了。另一位绅士提到它不是“允许的”,因为那部分是在执行程序上运行的。
对此有何建议?