0
  1. 将 AVRO 文件从 GCS 附加到表时遇到以下错误。avro 文件是有效的,但我们使用的是放气的 avro,这是一个问题吗?

线程“streaming-job-executor-0”中的异常 java.lang.NoClassDefFoundError: org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at org.apache.iceberg 的 org/apache/avro/InvalidAvroMagicException .avro.AvroIterable.iterator(AvroIterable.java:77) 在 org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37) 在 org.apache.iceberg.relocated.com.google.common.collect.Iterables .addAll(Iterables.java:320) 在 org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237) 在 org.apache.iceberg.ManifestLists.read(ManifestLists.java :46) 在 org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127) 在 org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:149) 在 org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer. java:343) 在 org.apache.iceberg。SnapshotProducer.apply(SnapshotProducer.java:163) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404 ) 在 org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197) 在 org.apache.iceberg 的 org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213)。 util.Tasks$Builder.run(Tasks.java:189) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275) at com.snapchat.transformer.TransformerStreamingWorker.lambda$execute$d121240d$1(TransformerStreamingWorker.java :162) 在 org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2 的 org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2(JavaDStreamLike.scala:280) $adapted(JavaDStreamLike.scala:280) 在 org.apache.spark.streaming.dstream。ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.streaming.dstream。 DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply( JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org. apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) 在 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257) 在 java.util。concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java .lang.ClassNotFoundException: org.apache.avro.InvalidAvroMagicException 在 java.lang.ClassLoader.loadClass 的 java.lang.ClassLoader.loadClass(ClassLoader.java:418) 的 java.net.URLClassLoader.findClass(URLClassLoader.java:382) (ClassLoader.java:351) ... 38 更多418) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 更多418) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 更多

  1. 日志显示冰山表的表已经存在,但是我无法在 gcs 中看到元数据文件?我正在从 dataproc 集群运行 spark 作业,我在哪里可以看到元数据文件?

##################### Iceberg 版本:0.11 spark 版本 3.0 #####################

public void appendData(List<FileMetadata> publishedFiles, Schema icebergSchema) {
    TableIdentifier tableIdentifier = TableIdentifier.of(TRANSFORMER, jobConfig.streamName());
    // PartitionSpec partitionSpec = IcebergInternalFields.getPartitionSpec(tableSchema);
    HadoopTables tables = new HadoopTables(new Configuration());

   
    PartitionSpec partitionSpec = PartitionSpec.builderFor(icebergSchema)
            .build();

    Table table = null;
    if (tables.exists(tableIdentifier.name())) {
        table = tables.load(tableIdentifier.name());
    } else {
        table = tables.create(
                icebergSchema,
                partitionSpec,
                tableIdentifier.name());
    }
    AppendFiles appendFiles = table.newAppend();
    for (FileMetadata fileMetadata : publishedFiles) {

        appendFiles.appendFile(DataFiles.builder(partitionSpec)
                .withPath(fileMetadata.getFilename())
                .withFileSizeInBytes(fileMetadata.getFileSize())
                .withRecordCount(fileMetadata.getRowCount())
                .withFormat(FileFormat.AVRO)
                .build());
    }
    appendFiles.commit();
}
4

1 回答 1

0

以下两件事解决了我的问题

  • 确保我为冰山表提供了正确的路径名(在我的情况下带有 gs:// 前缀)

  • 解决了 apache.avro 依赖版本冲突

于 2021-02-03T06:10:16.100 回答