我编写了一个基本的自定义处理器,它将流发送到“重试”关系并调用惩罚。
package nlsn.processors.core.main;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {
public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
.name("SUCCESS").description("well done, carry on").build();
public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
.name("FAILURE.").description("fail").build();
public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
.name("RETRY").description("point it back to processor").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS_RELATIONSHIP);
relationships.add(FAILURE_RELATIONSHIP);
relationships.add(POINT_TO_SELF_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile != null) {
logger.info("flow file is not null.");
String state = flowFile.getAttribute("_wait_state");
if (state == null || state.isEmpty()) {
logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
flowFile = session.putAttribute( flowFile, "_wait_state", "1");
flowFile = session.penalize(flowFile);
session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
} else {
logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
flowFile = session.removeAttribute( flowFile, "_wait_state" );
session.transfer( flowFile, SUCCESS_RELATIONSHIP);
}
} else {
//logger.info("flow file is null (bad)!!!.");
}
}
}
代码按预期工作。但我想知道为什么任务数(192,569)如此之高。正如预期的那样,过程在 30 秒内完成?
(请参阅 CustomWait 处理器任务计数)
- 什么是 nifi 在后台运行?
- 这么大的计数实际上会占用 CPU 吗?
- 如果这很糟糕,如何解决?
谢谢