nifi.processor.scheduling.timeout 是否真的默认为无限,如管理指南中所述?当我查看代码时,它看起来在 60 秒后超时。我们有一个处理器需要一些时间来启动(加载资源)并且遇到“等待 OnScheduled 时超时”错误。只是想弄清楚为什么它有时会在启动时失败,然后还会继续失败并出现同样的错误。
真的很奇怪。关闭所有处理器,启动实例并单独启动处理器似乎可以解决问题。但是,如果它们都打开并且实例重新启动,我们会遇到错误。
可能很容易成为其他东西,但启动顺序似乎有效。
来自 NIFI Github 的代码片段,我在其中发现超时错误
String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
long onScheduleTimeout = timeoutString == null ? 60000
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
Future<?> taskFuture = callback.invokeMonitoringTask(task);
try {
taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName()
+ "' lifecycle OnScheduled operation to finish.");
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e);
} catch (TimeoutException e) {
taskFuture.cancel(true);
LOG.warn("Timed out while waiting for OnScheduled of '"
+ this.processor.getClass().getSimpleName()
+ "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not "
+ "guarantee that the task will be canceled since the code inside current OnScheduled operation may "
+ "have been written to ignore interrupts which may result in runaway thread which could lead to more issues "
+ "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '"
+ this.processor + "' that needs to be documented, reported and eventually fixed.");
throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e);
} catch (ExecutionException e){
throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e);
} finally {
callback.postMonitor();
}