0

我是 Gobblin 的新手,正在尝试将数据从 Kafka 摄取到 HDFS。我能够成功地处理Kafka-HDFS 摄取示例。但是现在我需要为我的工作添加一个基于时间的写入器分区选项。我确实浏览了TimeBasedWriterPartitioner谷歌论坛,并按照宗军的建议提出了以下解决方案。

  1. 我为基于时间的编写器分区类创建了一个单独的 Java 项目:
import gobblin.writer.partitioner.TimeBasedWriterPartitioner;

public class LogJsonWriterPartitioner  extends TimeBasedWriterPartitioner<byte[]> {
    public LogJsonWriterPartitioner(gobblin.configuration.State state, int numBranches, int branchId) {
        super(state, numBranches, branchId);
    }

    @Override
    public long getRecordTimestamp(byte[] payload) {
        return System.currentTimeMillis();
    }
}

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.pm.data.gobblin.kafka</groupId>
    <artifactId>LogJsonWriterPartitioner </artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.linkedin.gobblin</groupId>
            <artifactId>gobblin-api</artifactId>
            <version>0.6.2</version>
        </dependency>
        <dependency>
            <groupId>com.linkedin.gobblin</groupId>
            <artifactId>gobblin-core</artifactId>
            <version>0.6.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>6.9.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>
  1. 从上面的项目创建一个Jar并将其复制到gobblin-dist/lib目录
  2. 我更新gobblin-dist/bin目录中的 gobblin-mapreduce.sh 并在 LIBJARS 下添加新的 jar 名称。
  3. 创建一个 Job 文件如下:
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
fs.uri=file:///

kafka.brokers=localhost:9092

source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=org.apache.gobblin.extract.kafka

writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.partitioner.class=com.pm.data.gobblin.kafka.LogJsonWriterPartitioner
writer.partition.granularity=day
writer.partition.pattern=YYYY-MM-dd
writer.partition.timezone=UTC
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt

data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
data.publisher.replace.final.dir=false
data.publisher.final.dir=/home/myuser/Desktop/Gobblin

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=${gobblin.cluster.work.dir}/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest
  1. 然后我使用 bin 目录中的 gobblin-standalone.sh 文件以独立方式启动 gobblin。

我在 logs/gobblin-current.log 上遇到以下错误

 org.apache.gobblin.runtime.fork.Fork  250 - Fork 0 of task task_GobblinKafkaQuickStart_1590391135660_0 failed to process data records. Set throwable in holder org.apache.gobblin.runtime.ForkThrowableHolder@433cf3c0
java.io.IOException: java.lang.ClassNotFoundException: com.pm.data.logging.gobblin.LogJsonWriterPartitioner
    at org.apache.gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:135)
    at org.apache.gobblin.runtime.fork.Fork.buildWriter(Fork.java:534)
    at org.apache.gobblin.runtime.fork.Fork.buildWriterIfNotPresent(Fork.java:542)
    at org.apache.gobblin.runtime.fork.Fork.processRecord(Fork.java:502)
    at org.apache.gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:103)
    at org.apache.gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:86)
    at org.apache.gobblin.runtime.fork.Fork.run(Fork.java:243)
    at org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.pm.data.logging.gobblin.LogJsonWriterPartitioner
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:128)
    ... 12 more

当我将我的工作文件修改为writer.partitioner.class=LogJsonWriterPartitioner时,错误将更改为java.lang.NoClassDefFoundError: gobblin/writer/partitioner/TimeBasedWriterPartitioner

有人可以帮助我克服这个问题吗?

4

1 回答 1

0

对于第一个问题,请确保您有正确package的陈述LogJsonWriterPartitioner,我希望它是package com.pm.data.logging.gobblin

第二,看起来 pom.xml 中的依赖项不正确,这就是TimeBasedWriterPartitioner无法加载的原因。com.linkedin.gobblin改名为org.apache.gobblin很久以前,版本号更高。最近的版本是“0.14.0”

于 2020-05-25T18:01:53.210 回答