7

Spark DStream 有mapPartitionAPI,而 Flink DataStreamAPI 没有。有没有人可以帮忙解释一下原因。我想做的是reduceByKey在 Flink 上实现一个类似于 Spark 的 API。

4

2 回答 2

7

Flink 的流处理模型与以小批量为中心的 Spark Streaming 有很大不同。在 Spark Streaming 中,每个 mini batch 都像常规批处理程序一样在有限的数据集上执行,而 Flink DataStream 程序连续处理记录。

在 Flink 的 DataSet API 中,aMapPartitionFunction有两个参数。输入的迭代器和函数结果的收集器。Flink DataStream 程序中的AMapPartitionFunction永远不会从第一个函数调用中返回,因为迭代器会遍历无穷无尽的记录流。但是,Flink 的内部流处理模型要求用户函数返回以检查函数状态。因此,DataStream API 不提供mapPartition转换。

为了实现类似于 Spark Streaming 的功能reduceByKey,您需要在流上定义一个键控窗口。Windows 将流离散化,这有点类似于小批量,但 Windows 提供了更多的灵活性。由于窗口的大小是有限的,因此您可以调用reduce该窗口。

这可能看起来像:

yourStream.keyBy("myKey") // organize stream by key "myKey"
          .timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
          .reduce(new YourReduceFunction); // apply a reduce function on each window

DataStream 文档展示了如何定义各种窗口类型并解释了所有可用的功能。

注意:最近对 DataStream API 进行了重新设计。该示例假定最新版本 (0.10-SNAPSHOT) 将在未来几天发布为 0.10.0。

于 2015-10-28T21:16:29.287 回答
0

假设您的输入流是单分区数据(比如字符串)

val new_number_of_partitions = 4

//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)

//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
  // var local_val_to_different_part : Type = null
  var myTaskId : Int = null

  //below function is executed once for each mapper function (one mapper per partition)
  override def open(config: Configuration): Unit = {
    myTaskId = getRuntimeContext.getIndexOfThisSubtask
    //do whatever initialization you want to do. read from data sources..
  }

  def map(value: String): (String, Int) = {
    (value, myTasKId)
  }
})

val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function

Flink 流是纯流(不是批处理的)。看看迭代 API。

于 2016-04-12T06:10:24.853 回答