1

我有一个使用 Kafka 日志并将其写入 HDFS 的 Apache Apex 应用程序。

DAG 非常简单,有一个 Kafka 消费者(20 个 2 GB 内存的分区用于操作员)通过流连接到“MyWriter extends AbstractFileOutputOperator”。

问题: 1.我看到Writer多次重复写入相同大小和相同数据的.tmp文件。我已经尝试增加写入操作符的内存,增加写入器的分区数量等。这个问题仍然存在。

我尝试向 MyWriter 添加/删除 requestFinalize。还是同样的问题。

 @Override
    public void endWindow()
    {
        if (null != fileName) {
            requestFinalize(fileName);
        }
        super.endWindow();
    }

这是我的 properties.xml 的一个子集

<property>
    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
    <value>1000</value>
  </property>

  <property>
    <name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name>
    <value>60</value>
  </property>

  <property>
    <name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name>
     <value>60</value>
  </property>

 <property>
        <name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name>
        <value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value>
    </property>

  <property>
    <name>dt.application.myapp.operator.myWriter.prop.maxLength</name>
    <value>1000000000</value> <!-- 1 GB File -->
  </property>

这是我能够从 dt.log 为操作员获取的堆栈跟踪:操作员可能在不同的容器中重新部署,抛出此异常并继续写入重复文件。

 java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
        at com.datatorrent.stram.engine.Node.setup(Node.java:187)
        at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
        at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
        at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
    Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
        ... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
4

1 回答 1

1

基本运算符的代码位于以下链接中,并在以下评论中引用: https ://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib /io/fs/AbstractFileOutputOperator.java

通过将最大文件大小设置为 1GB,您将自动启用滚动文件;相关领域是:

protected Long maxLength = Long.MAX_VALUE;
protected transient boolean rollingFile = false;

setup()如果前者的值小于 的默认值,则后者在方法中设置为 true Long.MAX_VALUE

启用滚动文件后,文件最终化会自动完成,因此您不应调用requestFinalize().

其次,在您的MyWriter类中,删除endWindow()覆盖并确保创建所需的文件名,其中包括方法中的操作员 ID,并在覆盖setup()中返回此文件名;getFileName()这样可以确保多个分区器不会互相踩踏。例如:

@NotNull
private String fileName;           // current base file name

private transient String fName;    // per partition file name

@Override
public void setup(Context.OperatorContext context)
{
  // create file name for this partition by appending the operator id to
  // the base name
  //
  long id = context.getId();
  fName = fileName + "_p" + id;
  super.setup(context);

  LOG.debug("Leaving setup, fName = {}, id = {}", fName, id);
}

@Override
protected String getFileName(Long[] tuple)
{
  return fName;
}

文件基本名称(fileName在上面的代码中)可以直接在代码中设置或从 XML 文件中的属性初始化(您还需要为其添加 getter 和 setter)。

您可以在以下位置查看此类用法的示例: https ://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput

几个额外的建议:

  1. 将分区计数设置为 1(或注释掉设置PARTITIONER属性的 XML)并确保一切按预期工作。这将消除任何与分区无关的问题。如果可能,还可以将最大文件大小减小到 2K 或 4K,以便测试更容易。
  2. 一旦单个分区的情况起作用,将分区的数量增加到 2。如果这样做,任意更大的数字(在合理范围内)也应该起作用。
于 2016-08-19T14:36:42.523 回答