1

我已经为 Google Data Fusion 上的 CDAP 平台开发并成功部署了一个自定义批处理源插件。该插件有时在预览模式下工作,但当我部署管道时总是失败,并出现以下错误:

org.apache.tephra.TransactionFailureException: Unable to persist changes of transaction-aware 'RecordGenerator' for transaction 1574271280330000000
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.createTransactionFailure(AbstractTransactionContext.java:313) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.persist(AbstractTransactionContext.java:260) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.finish(AbstractTransactionContext.java:115) ~[na:na]
    at io.cdap.cdap.data2.transaction.Transactions$CacheBasedTransactional.finishExecute(Transactions.java:230) ~[na:na]
    at io.cdap.cdap.data2.transaction.Transactions$CacheBasedTransactional.execute(Transactions.java:211) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:546) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:534) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.BasicSparkClientContext.execute(BasicSparkClientContext.java:333) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.etl.common.submit.SubmitterPlugin.prepareRun(SubmitterPlugin.java:69) ~[na:na]
    at io.cdap.cdap.etl.batch.PipelinePhasePreparer.prepare(PipelinePhasePreparer.java:111) ~[na:na]
    at io.cdap.cdap.etl.spark.batch.SparkPreparer.prepare(SparkPreparer.java:104) ~[na:na]
    at io.cdap.cdap.etl.spark.batch.ETLSpark.initialize(ETLSpark.java:112) ~[na:na]
    at io.cdap.cdap.api.spark.AbstractSpark.initialize(AbstractSpark.java:131) ~[na:na]
    at io.cdap.cdap.api.spark.AbstractSpark.initialize(AbstractSpark.java:33) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$2.initialize(SparkRuntimeService.java:167) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$2.initialize(SparkRuntimeService.java:162) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.lambda$initializeProgram$1(AbstractContext.java:640) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.execute(AbstractContext.java:600) ~[na:na]
    at io.cdap.cdap.internal.app.runtime.AbstractContext.initializeProgram(AbstractContext.java:637) ~[na:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService.initialize(SparkRuntimeService.java:433) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService.startUp(SparkRuntimeService.java:208) ~[io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:47) ~[com.google.guava.guava-13.0.1.jar:na]
    at io.cdap.cdap.app.runtime.spark.SparkRuntimeService$5$1.run(SparkRuntimeService.java:404) [io.cdap.cdap.cdap-spark-core2_2.11-6.1.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
    Suppressed: org.apache.tephra.TransactionFailureException: Unable to roll back changes in transaction-aware 'RecordGenerator' for transaction 1574271280330000000
        at io.cdap.cdap.data2.transaction.AbstractTransactionContext.createTransactionFailure(AbstractTransactionContext.java:313) ~[na:na]
        at io.cdap.cdap.data2.transaction.AbstractTransactionContext.abort(AbstractTransactionContext.java:143) ~[na:na]
        ... 23 common frames omitted
Caused by: java.io.IOException: Database /data/ldb/cdap_default.RecordGenerator.kv does not exist and the create if missing option is disabled
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.openTable(LevelDBTableService.java:218)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.getTable(LevelDBTableService.java:181)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.getDB(LevelDBTableCore.java:80)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.undo(LevelDBTableCore.java:184)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.undoPersisted(LevelDBTable.java:113)
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.undo(LevelDBTable.java:108)
    at io.cdap.cdap.data2.dataset2.lib.table.BufferingTable.rollbackTx(BufferingTable.java:368)
    at io.cdap.cdap.api.dataset.lib.AbstractDataset.rollbackTx(AbstractDataset.java:101)
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.abort(AbstractTransactionContext.java:141)
    ... 23 common frames omitted
Caused by: java.io.IOException: Database /data/ldb/cdap_default.RecordGenerator.kv does not exist and the create if missing option is disabled
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.openTable(LevelDBTableService.java:218) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService.getTable(LevelDBTableService.java:181) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.getDB(LevelDBTableCore.java:80) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore.persist(LevelDBTableCore.java:164) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.persist(LevelDBTable.java:100) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTable.persist(LevelDBTable.java:92) ~[na:na]
    at io.cdap.cdap.data2.dataset2.lib.table.BufferingTable.commitTx(BufferingTable.java:351) ~[na:na]
    at io.cdap.cdap.api.dataset.lib.AbstractDataset.commitTx(AbstractDataset.java:91) ~[na:na]
    at io.cdap.cdap.data2.transaction.AbstractTransactionContext.persist(AbstractTransactionContext.java:255) ~[na:na]
    ... 22 common frames omitted

我认为该错误具有误导性,因为该错误源自插件内的以下代码:

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
    pipelineConfigurer.createDataset("RecordGenerator", KeyValueTable.class);
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
}

@Override
@TransactionPolicy(TransactionControl.IMPLICIT)
public void prepareRun(BatchSourceContext context) throws IOException {
    KeyValueTable d = context.getDataset("RecordGenerator");
    d.write("numberOfRecords", Long.toString(config.numberOfRecords));
    context.setInput(Input.ofDataset("RecordGenerator"));
}

特别是有罪的线是d.write("numberOfRecords", Long.toString(config.numberOfRecords));。如果我删除这一行,插件可以工作,但显然不会运行transform插件的一部分。

我没有想法,预览模式下的行为似乎不稳定,而且文档(如果有的话)真的很少说最好的。我该怎么做才能让它发挥作用?

4

1 回答 1

4

数据融合不支持 KeyValueTable。它在预览中工作,因为预览在本地模式下运行。如果你想在prepareRun()方法中保存一些东西,你将需要使用一些其他的存储。一个简单的替代方法是使用 gcs 上的文件来存储信息。这是一段代码片段,可用于将信息写入文件:https ://github.com/data-integrations/kafka-plugins/blob/release/2.2/kafka-plugins-0.10/src/main /java/io/cdap/plugin/batch/source/KafkaBatchSource.java#L160-L167

于 2019-11-20T21:06:34.000 回答