- 将 AVRO 文件从 GCS 附加到表时遇到以下错误。avro 文件是有效的,但我们使用的是放气的 avro,这是一个问题吗?
线程“streaming-job-executor-0”中的异常 java.lang.NoClassDefFoundError: org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at org.apache.iceberg 的 org/apache/avro/InvalidAvroMagicException .avro.AvroIterable.iterator(AvroIterable.java:77) 在 org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37) 在 org.apache.iceberg.relocated.com.google.common.collect.Iterables .addAll(Iterables.java:320) 在 org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237) 在 org.apache.iceberg.ManifestLists.read(ManifestLists.java :46) 在 org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127) 在 org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:149) 在 org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer. java:343) 在 org.apache.iceberg。SnapshotProducer.apply(SnapshotProducer.java:163) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404 ) 在 org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197) 在 org.apache.iceberg 的 org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213)。 util.Tasks$Builder.run(Tasks.java:189) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275) at com.snapchat.transformer.TransformerStreamingWorker.lambda$execute$d121240d$1(TransformerStreamingWorker.java :162) 在 org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2 的 org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2(JavaDStreamLike.scala:280) $adapted(JavaDStreamLike.scala:280) 在 org.apache.spark.streaming.dstream。ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.streaming.dstream。 DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply( JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org. apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257) 在 java.util。concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java .lang.ClassNotFoundException: org.apache.avro.InvalidAvroMagicException 在 java.lang.ClassLoader.loadClass 的 java.lang.ClassLoader.loadClass(ClassLoader.java:418) 的 java.net.URLClassLoader.findClass(URLClassLoader.java:382) (ClassLoader.java:351) ... 38 更多418) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 更多418) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 更多
- 日志显示冰山表的表已经存在,但是我无法在 gcs 中看到元数据文件?我正在从 dataproc 集群运行 spark 作业,我在哪里可以看到元数据文件?
##################### Iceberg 版本:0.11 spark 版本 3.0 #####################
public void appendData(List<FileMetadata> publishedFiles, Schema icebergSchema) {
TableIdentifier tableIdentifier = TableIdentifier.of(TRANSFORMER, jobConfig.streamName());
// PartitionSpec partitionSpec = IcebergInternalFields.getPartitionSpec(tableSchema);
HadoopTables tables = new HadoopTables(new Configuration());
PartitionSpec partitionSpec = PartitionSpec.builderFor(icebergSchema)
.build();
Table table = null;
if (tables.exists(tableIdentifier.name())) {
table = tables.load(tableIdentifier.name());
} else {
table = tables.create(
icebergSchema,
partitionSpec,
tableIdentifier.name());
}
AppendFiles appendFiles = table.newAppend();
for (FileMetadata fileMetadata : publishedFiles) {
appendFiles.appendFile(DataFiles.builder(partitionSpec)
.withPath(fileMetadata.getFilename())
.withFileSizeInBytes(fileMetadata.getFileSize())
.withRecordCount(fileMetadata.getRowCount())
.withFormat(FileFormat.AVRO)
.build());
}
appendFiles.commit();
}