1

我编写了一个基本的自定义处理器,它将流发送到“重试”关系并调用惩罚。

package nlsn.processors.core.main;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;

@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {

    public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
            .name("SUCCESS").description("well done, carry on").build();

    public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
            .name("FAILURE.").description("fail").build();

    public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
            .name("RETRY").description("point it back to processor").build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;


    @Override
    protected void init(final ProcessorInitializationContext context) {

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS_RELATIONSHIP);
        relationships.add(FAILURE_RELATIONSHIP);
        relationships.add(POINT_TO_SELF_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final ComponentLog logger = getLogger();
        FlowFile flowFile = session.get();
        if (flowFile != null) {
            logger.info("flow file is not null.");
            String state = flowFile.getAttribute("_wait_state");
            if (state == null || state.isEmpty()) {
                logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
                flowFile = session.putAttribute( flowFile, "_wait_state", "1");
                flowFile = session.penalize(flowFile);
                session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
            } else {
                logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
                flowFile = session.removeAttribute( flowFile, "_wait_state" );
                session.transfer( flowFile, SUCCESS_RELATIONSHIP); 
            }
        } else {
            //logger.info("flow file is null (bad)!!!.");
        }
    }
}

在此处输入图像描述

代码按预期工作。但我想知道为什么任务数(192,569)如此之高。正如预期的那样,过程在 30 秒内完成?

(请参阅 CustomWait 处理器任务计数)

  1. 什么是 nifi 在后台运行?
  2. 这么大的计数实际上会占用 CPU 吗?
  3. 如果这很糟糕,如何解决?

谢谢

4

1 回答 1

3
  1. 当队列中有一个流文件(FF)为进程提供服务时,NiFi 控制器计划运行一个处理器,而不检查 FF 的惩罚状态。在处理器的 onTrigger 中,它会尝试从输入队列 ( session.get()) 中获取 FF。这session.get()不会得到任何惩罚 FF,因此最终会返回 null。这就是为什么需要检查空 FF 并且还不错的原因。我假设您没有更改运行计划,这意味着控制器将尝试尽可能快地运行该处理器。这会导致任务计数膨胀。
  2. 它正在尝试检查要处理的输入,因此它正在使用 CPU。是否占用取决于可用任务的数量和系统上运行的处理器。
  3. 本质上并不坏,但可以通过设置运行计划来减少!= 0。
于 2018-08-19T00:55:35.213 回答