我有一个使用 Kafka 日志并将其写入 HDFS 的 Apache Apex 应用程序。
DAG 非常简单,有一个 Kafka 消费者(20 个 2 GB 内存的分区用于操作员)通过流连接到“MyWriter extends AbstractFileOutputOperator”。
问题: 1.我看到Writer多次重复写入相同大小和相同数据的.tmp文件。我已经尝试增加写入操作符的内存,增加写入器的分区数量等。这个问题仍然存在。
我尝试向 MyWriter 添加/删除 requestFinalize。还是同样的问题。
@Override
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
这是我的 properties.xml 的一个子集
<property>
<name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
<value>1000</value>
</property>
<property>
<name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name>
<value>60</value>
</property>
<property>
<name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name>
<value>60</value>
</property>
<property>
<name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name>
<value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value>
</property>
<property>
<name>dt.application.myapp.operator.myWriter.prop.maxLength</name>
<value>1000000000</value> <!-- 1 GB File -->
</property>
这是我能够从 dt.log 为操作员获取的堆栈跟踪:操作员可能在不同的容器中重新部署,抛出此异常并继续写入重复文件。
java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
at com.datatorrent.stram.engine.Node.setup(Node.java:187)
at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.