我已经编写了名为 MySink 的自定义水槽水槽,其处理方法在下面的第一个片段中指示。我得到一个 IllegalStateException 如下(详细的堆栈跟踪可在下面的第二个片段中找到):
原因:java.lang.IllegalStateException: begin() 在事务打开时调用!
问题:在编写 process 方法时,我在 Flume 代码库中遵循了 KafkaSink 和类似的现有接收器实现,并且我对那些退出的接收器应用了相同的事务处理逻辑。你能告诉我我的处理方法有什么问题吗?我该如何解决这个问题?
PROCESS 方法(我已经标记了抛出异常的位置):
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
Event event = null;
try {
LOG.info(getName() + " BEFORE txn.begin()");
//!!!! EXCEPTION IS THROWN in the following LINE !!!!!!
txn.begin();
LOG.info(getName() + " AFTER txn.begin()");
LOG.info(getName() + " BEFORE ch.take()");
event = ch.take();
LOG.info(getName() + " AFTER ch.take()");
if (event == null) {
// No event found, request back-off semantics from the sink runner
LOG.info(getName() + " - EVENT is null! ");
return Status.BACKOFF;
}
Map<String, String> keyValueMapInTheMessage = event.getHeaders();
if (!keyValueMapInTheMessage.isEmpty()) {
mDBWriter.insertDataToDB(keyValueMapInTheMessage);
}
LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
if (txn != null) {
txn.commit();
}
} catch (Exception ex) {
String errMsg = getName() + " - Failed to publish events. Exception: ";
LOG.info(errMsg);
status = Status.BACKOFF;
if (txn != null) {
try {
txn.rollback();
} catch (Exception e) {
LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errMsg, ex);
} finally {
if (txn != null) {
txn.close();
}
}
return status;
}
异常堆栈:
2016-01-22 14:01:15,440 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: MySink - Failed to publish events. Exception: at com.XYZ.flume.maprdb.MySink.process(MySink.java:116)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at com.XYZ.flume.maprdb.MySink.process(MySink.java:82)
... 3 more