我在文件中有一个事件(JSON 消息),需要通过 filebeat发送到 Kafka 。JSON 消息如下所示:
{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}
我想将此消息发送给 Kafka。分区键应该是 JSON 事件消息中的应用程序字段。如何在 JSON 消息中提供自定义应用程序字段作为 Kafka 记录的分区键?
filebeat.yml 是这样的:
…
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[message.application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
compression: none
https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf 根据this reference,我们可以使用格式字符串规范来引用事件字段值。
使用此配置,默认消息“默认”始终报告为键。如何配置 filebeat.yml 以提取自定义应用程序字段并将此信息用作 Kafka 分区键?
此外,我尝试在输入部分定义一个字段,如下所示:
type: log
enabled: true
paths:
- /var/logs/*event.log
fields:
log_topic: "event"
application: '%{[application]} string'
fields_under_root: true
和相应的kafka输出为:
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
但随后 kafka 分区键始终为:%{[application]} string