0

我正在尝试使用 hive metastore、s3 和 Java Api 和 spark 创建冰山表。没有找到有效的 PoC 或任何 github 存储库。我编写的代码是多个来源的结果。

我的代码:

    SparkConf conf = new SparkConf();
        conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
        conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog");
        conf.set("spark.sql.catalog.spark_catalog.type", "hive");
        conf.set("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083");
        conf.set("spark.hadoop.hive.metastore.warehouse.dir", "s3a://resource-fvlelydg/warehouse/");
        conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "true");
        conf.set("spark.hadoop.fs.s3a.path.style.access", "true");
        conf.set("spark.hadoop.fs.s3a.endpoint", "object storage endpoint");
        conf.set("spark.hadoop.fs.s3a.access.key", "key");
        conf.set("spark.hadoop.fs.s3a.secret.key", "secret");
        conf.set("spark.hadoop.fs.s3a.attempts.maximum", "1");
        conf.set("spark.hadoop.fs.s3a.connection.establish.timeout", "500");
        conf.set("spark.hadoop.datanucleus.autoCreateSchema", "true");
        conf.set("spark.hadoop.datanucleus.fixedDatastore", "false");

        SparkSession session = SparkSession.builder().appName("spark-test").master("local[*]").config(conf)
                .getOrCreate();

        TableIdentifier name = TableIdentifier.of("logging", "logs");

        HiveCatalog catalog = new HiveCatalog();
        catalog.setConf(session.sparkContext().hadoopConfiguration()); // Configure using Spark's Hadoop configuration

        Map<String, String> properties = new HashMap<String, String>();

        properties.put("uri", "thrift://localhost:9083");

        catalog.initialize("hive", properties);

        Schema schema = new Schema(Types.NestedField.required(1, "level", Types.StringType.get()),
                Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
                Types.NestedField.required(3, "message", Types.StringType.get()),
                Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())));

        PartitionSpec spec = PartitionSpec.builderFor(schema).hour("event_time").identity("level").build();

        Table table = catalog.createTable(name, schema, spec);

错误日志:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/01/01 03:07:41 WARN Utils: Your hostname, zeromsi resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface wlo1)
22/01/01 03:07:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/01/01 03:07:41 INFO SparkContext: Running Spark version 2.3.0
22/01/01 03:07:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/01 03:07:41 INFO SparkContext: Submitted application: spark-test
22/01/01 03:07:41 INFO SecurityManager: Changing view acls to: zeromsi
22/01/01 03:07:41 INFO SecurityManager: Changing modify acls to: zeromsi
22/01/01 03:07:41 INFO SecurityManager: Changing view acls groups to: 
22/01/01 03:07:41 INFO SecurityManager: Changing modify acls groups to: 
22/01/01 03:07:41 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(zeromsi); groups with view permissions: Set(); users  with modify permissions: Set(zeromsi); groups with modify permissions: Set()
22/01/01 03:07:42 INFO Utils: Successfully started service 'sparkDriver' on port 46797.
22/01/01 03:07:42 INFO SparkEnv: Registering MapOutputTracker
22/01/01 03:07:42 INFO SparkEnv: Registering BlockManagerMaster
22/01/01 03:07:42 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/01/01 03:07:42 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/01/01 03:07:42 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-38927f9c-b854-49cf-8c8f-cac4f203c63f
22/01/01 03:07:42 INFO MemoryStore: MemoryStore started with capacity 1594.8 MB
22/01/01 03:07:42 INFO SparkEnv: Registering OutputCommitCoordinator
22/01/01 03:07:42 INFO Utils: Successfully started service 'SparkUI' on port 4040.
22/01/01 03:07:42 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.107:4040
22/01/01 03:07:42 INFO Executor: Starting executor ID driver on host localhost
22/01/01 03:07:42 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45217.
22/01/01 03:07:42 INFO NettyBlockTransferService: Server created on 192.168.0.107:45217
22/01/01 03:07:42 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/01/01 03:07:42 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.107, 45217, None)
22/01/01 03:07:42 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.107:45217 with 1594.8 MB RAM, BlockManagerId(driver, 192.168.0.107, 45217, None)
22/01/01 03:07:42 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.107, 45217, None)
22/01/01 03:07:42 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.107, 45217, None)
Exception in thread "main" java.lang.NoSuchMethodError: 'void scala.Function1.$init$(scala.Function1)'
    at org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions.<init>(IcebergSparkSessionExtensions.scala:35)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
    at java.base/java.lang.reflect.ReflectAccess.newInstance(ReflectAccess.java:128)
    at java.base/jdk.internal.reflect.ReflectionFactory.newInstance(ReflectionFactory.java:347)
    at java.base/java.lang.Class.newInstance(Class.java:645)
    at org.apache.spark.sql.SparkSession$Builder.liftedTree1$1(SparkSession.scala:940)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:938)
    at com.msi.spark.Maiin.main(Maiin.java:41)
22/01/01 03:07:42 INFO SparkContext: Invoking stop() from shutdown hook
22/01/01 03:07:42 INFO SparkUI: Stopped Spark web UI at http://192.168.0.107:4040
22/01/01 03:07:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/01/01 03:07:42 INFO MemoryStore: MemoryStore cleared
22/01/01 03:07:42 INFO BlockManager: BlockManager stopped
22/01/01 03:07:42 INFO BlockManagerMaster: BlockManagerMaster stopped
22/01/01 03:07:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/01/01 03:07:42 INFO SparkContext: Successfully stopped SparkContext
22/01/01 03:07:42 INFO ShutdownHookManager: Shutdown hook called
22/01/01 03:07:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-a5555e94-2ad6-4b2a-b581-1aaa4afb6c3a

创建 SparkSession 时抛出错误。

爪哇:

  • 爪哇 9

我不确定,如果我以正确的方式创建表格。请帮助我提出建议或任何有效的 Maven 和 Java 配置。

4

0 回答 0