我正在尝试通过 Apache Iceberg 0.9.1 将简单数据写入表中,但显示错误消息。我想直接通过 Hadoop 对数据进行 CRUD。我创建了一个 hadooptable ,并尝试从表中读取。之后我尝试将数据写入表中。我准备了一个包含一行的 json 文件。我的代码已经读取了json对象,并排列了数据的顺序,但是最后一步写入数据总是出错。我已经更改了某些版本的依赖包,但显示了另一个错误消息。包的版本是否有问题。请帮我。
这是我的源代码:
public class IcebergTest {
public static void main(String[] args) {
testWithoutCatalog();
readDataWithouCatalog();
writeDataWithoutCatalog();
}
public static void testWithoutCatalog() {
Schema bookSchema = new Schema(optional(1, "title", Types.StringType.get()),
optional(2, "price", Types.LongType.get()),
optional(3, "author", Types.StringType.get()),
optional(4, "genre", Types.StringType.get()));
PartitionSpec bookspec = PartitionSpec.builderFor(bookSchema).identity("title").build();
Configuration conf = new Configuration();
String warehousePath = "hdfs://hadoop01:9000/warehouse_path/xgfying/books3";
HadoopTables tables = new HadoopTables(conf);
Table table = tables.create(bookSchema, bookspec, warehousePath);
}
public static void readDataWithouCatalog(){
.......
}
public static void writeDataWithoutCatalog(){
SparkSession spark = SparkSession.builder().master("local[2]").getOrCreate();
Dataset<Row> df = spark.read().json("src/test/data/books3.json");
System.out.println(" this is the writing data : "+df.select("title","price","author","genre")
.first().toString());
df.select("title","price","author","genre")
.write().format("iceberg").mode("append")
.save("hdfs://hadoop01:9000/warehouse_path/xgfying/books3");
// System.out.println(df.write().format("iceberg").mode("append").toString());
}
}
这是错误消息:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/18 15:51:36 INFO SparkContext: Running Spark version 2.4.5
.......
file:///C:/tmp/icebergtest1/src/test/data/books3.json, range: 0-75, partition values: [empty row]
20/11/18 15:51:52 ERROR Utils: Aborting task
java.lang.ExceptionInInitializerError
at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:232)
at org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:61)
at org.apache.iceberg.spark.source.BaseWriter.openCurrent(BaseWriter.java:105)
at org.apache.iceberg.spark.source.PartitionedWriter.write(PartitionedWriter.java:63)
at org.apache.iceberg.spark.source.Writer$Partitioned24Writer.write(Writer.java:271)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Cannot find constructor for interface org.apache.parquet.column.page.PageWriteStore
Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.<init>(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]
at org.apache.iceberg.common.DynConstructors$Builder.build(DynConstructors.java:235)
at org.apache.iceberg.parquet.ParquetWriter.<clinit>(ParquetWriter.java:55)
... 19 more
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 2, attempt 0, stage 2.0)
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 2, attempt 0, stage 2.0)
这是我的 pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>icebergtest</groupId>
<artifactId>icebergtest1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>icebergtest1</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<iceberg.version>0.9.1</iceberg.version>
<hadoop.version>2.7.0</hadoop.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- org.apache.hadoop BEGIN-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<!--将netty包排除-->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--解决io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena()I异常,-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.18.Final</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- org.apache.hadoop END-->
<!-- org.apache.iceberg BEGIN-->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-arrow</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark2</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-pig</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-mr</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- org.apache.iceberg END-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.5</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<!--<version>2.7.9</version>-->
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<!--<version>2.7.9.4</version>-->
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<!--<version>2.7.9</version>-->
<version>2.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>