0

我更新了流文件的一些属性并将其放入 kafka 中,但是当我从 consumekafka_2.0 处理器使用相同的属性时,属性丢失了。这不支持吗?我需要定制这个处理器吗?

当我看到处理器的以下源代码时,我知道它已经从记录中读取属性并在流文件中写入相同的属性,那么为什么这些在流文件中不可用?

private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
        FlowFile flowFile = session.create();
        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
        tracker.incrementRecordCount(1);
        final byte[] value = record.value();
        if (value != null) {
            flowFile = session.write(flowFile, out -> {
                out.write(value);
            });
        }
        flowFile = session.putAllAttributes(flowFile, getAttributes(record));
        tracker.updateFlowFile(flowFile);
        populateAttributes(tracker);
        session.transfer(tracker.flowFile, REL_SUCCESS);
    }
4

1 回答 1

3

为了传递属性,您必须使用 Kafka 标头,否则无法传递属性,因为它们不是流文件主体的一部分,而流文件主体将成为 Kafka 中的消息主体。

在发布端,PublishKafka_2_0 具有以下属性来指定要作为标头发送的属性:

static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
        .name("attribute-name-regex")
        .displayName("Attributes to Send as Headers (Regex)")
        .description("A Regular Expression that is matched against all FlowFile attribute names. "
            + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
            + "If not specified, no FlowFile attributes will be added as headers.")
        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
        .required(false)
        .build();

在消费端,ConsumeKafka_2_0 具有以下属性,用于指定将哪些标头字段添加为属性:

static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
        .name("header-name-regex")
        .displayName("Headers to Add as Attributes (Regex)")
        .description("A Regular Expression that is matched against all message headers. "
            + "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
            + "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
            + "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
            + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
            + "the messages together efficiently.")
        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
        .required(false)
        .build();
于 2019-04-09T13:39:53.663 回答