将 amazon kinesis 分析与 java flink 应用程序一起使用,我从 firehose 获取数据并尝试将其作为一系列 parquet 文件写入 S3 存储桶。我在我的云监视日志中遇到了以下异常,这是我能看到的唯一可能相关的错误。
我已经启用了文档中指定的检查点,并包含了 flink/arvo 依赖项。在本地运行它有效。到达检查点时,parquet 文件将写入本地本地磁盘。
例外
"message": "Exception type is USER from filter results [UserClassLoaderExceptionFilter -> USER, UserAPIExceptionFilter -> SKIPPED, UserSerializationExceptionFilter -> SKIPPED, UserFunctionExceptionFilter -> SKIPPED, OutOfMemoryExceptionFilter -> NONE, TooManyOpenFilesExceptionFilter -> NONE, KinesisServiceExceptionFilter -> NONE].",
"throwableInformation": [
"java.lang.Exception: Error while triggering checkpoint 1360 for Source: Custom Source -> Map -> Sink: HelloS3 (1/1)",
"org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1201)",
"java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)",
"java.util.concurrent.FutureTask.run(FutureTask.java:266)",
"java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)",
"java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)",
"java.lang.Thread.run(Thread.java:748)",
"Caused by: java.lang.AbstractMethodError: org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V",
"org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53)",
"org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315)",
"org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:152)",
"org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:27)",
"org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)",
"org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)",
"org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)",
"org.apache.flink.formats.parquet.ParquetBulkWriter.finish(ParquetBulkWriter.java:62)",
"org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:62)",
"org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:235)",
"org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:276)",
"org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:249)",
"org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244)",
"org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235)",
"org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347)",
"org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)",
"org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)",
"org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)",
"org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)",
"org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1138)",
"org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1080)",
"org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:754)",
"org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)",
"org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:584)",
"org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:114)",
"org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1190)",
"\t... 5 more"
下面是我的代码片段。我在处理事件时得到我的日志记录,甚至是来自 bucketassigner 的日志记录。
env.setStateBackend(new FsStateBackend("s3a://<BUCKET>/checkpoint"));
env.setParallelism(1);
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
StreamingFileSink<Metric> sink = StreamingFileSink
.forBulkFormat(new Path("s3a://<BUCKET>/raw"), ParquetAvroWriters.forReflectRecord(Metric.class))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
我的pom:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.11-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
我的 AWS 配置启用了“快照”。当我使用 rowWriting 而不是批量写入时,写入权限对存储桶起作用。
真的不确定现在要寻找什么才能使其正常工作。