1

我是 flink 的新手,我有这样的转变假设

val supportTask= customSource

  .map( line => line.split(","))
  .map( line => SupportTaskNew(line(0)toInt,line(1).toString,line(2)toString,line(3)toLong,line(4).toString,line(5)toInt,line(6)toInt))
  .filter(_ => true) //todo put sent date condition
  .map( line => Count(1))
  .keyBy(0)
  .timeWindow(Time.seconds(20)) //todo for time being 10 seconds, actuals 30 min
 .sum(0)  

现在我想为每 20 秒的时间窗口创建文件

supportTask.writeAsText(("D://myfile_"+Calendar.getInstance().get(Calendar.SECOND)),WriteMode.NO_OVERWRITE).setParallelism(1)

我提供了文件名+秒,这样每次创建文件时都会附加秒数。

但是这里只创建了一个文件,我想每 20 秒窗口创建一个新文件,我该怎么做?

4

1 回答 1

2

也许您可以使用Bucketing File Sink和自定义Bucketer.

于 2017-05-29T16:40:45.387 回答