0

对于我的 spark 2.1.1 和 Kafka 0.10.2.1 结构化流示例,我能够通过接收foreach器工作。我的流源配置为每 10 秒推送 2 条消息。

我看到前几条消息通过接收foreach器(打开 - 过程 - 关闭)构造很好。但是,在第一次推送之后,该进程不再从队列中读取?

有任何想法吗 ?

val stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka10.broker")
  .option("subscribe", src_topic)
  .load()

val df = stream.selectExpr("cast (value as string) as json")

  .select(functions.from_json(StringToColumn(StringContext.apply("json")).$(), txnSchema).as("data"))
          .select("data.*")

编写器实现:

 val writer = new ForeachWriter[Row] {
      /*
      Prepare Hbase connection
      Prepare Kafka Producer
      */
        true
      }

     override def process(row: Row) = {
         try {
         /*
            Do biz logic
            get data from hbase
            at the end, write to a kafka queue
         */
         }
         catch {
            case tt: Throwable => {
            rlog("Something else wierd happened.")
            }
         }
      }

  override def close(errorOrNull: Throwable) = {
    println("-------------------------------In close now. checking whether it was called due to soem error")
    if(errorOrNull != null)
      errorOrNull .printStackTrace()

    println("-------------------------------Closing hbase Connection")
    //closing hbase connections

    println("-------------------------------Closing Kafka connection now")
    // closing kafka producer object

   // Other cleanup
    println("-------------------------------All done. Exiting now.")
  }
}

在我的业务逻辑处理过程中,我需要将数据从 HBase 转换为数据帧以进行进一步处理。我这样做的尝试失败了。另一位绅士提到它不是“允许的”,因为那部分是在执行程序上运行的。

对此有何建议?

4

0 回答 0