0

我写了一个定制的火花水槽。在我的addBatch方法ForEachPartitionAsync中,如果我没记错的话,我使用它只会使驱动程序异步工作,返回一个未来。

    val work: FutureAction[Unit] = rdd.foreachPartitionAsync { rows =>
       val sourceInfo: StreamSourceInfo = serializeRowsAsInputStream(schema, rows)

       val ackIngestion = Future {
       ingestRows(sourceInfo) } andThen {
       case Success(ingestion) => ackIngestionDone(partitionId, ingestion)
       }

       Await.result(ackIngestion, timeOut) // I would like to remove this line..
    }
    work onSuccess {
      case _ => // move data from temporary table, report success of all workers
   }
        work onFailure{
      //delete tmp data
      case t => throw t.getCause
    }

我找不到在不阻塞 Await 调用的情况下运行工作节点的方法,就好像我删除它们一样,work虽然未来并没有真正完成,但成功报告给未来对象。

有没有办法向司机报告所有工人都完成了他们的异步工作?

注意:我查看了该foreachPartitionAsync函数,它只有一个实现需要一个返回 Unit 的函数(我希望它有另一个返回未来或者可能是 CountDownLatch ..)

4

0 回答 0