4

我正在编写一个 kibana 插件和一个 logstash 管道。对于我的测试,我只是写了一个这样的logstash输入:

input {

   beats {
        port => 9600
        ssl => false
        ssl_verify_mode => "none"
    }

}

但是当我尝试打开与节点的连接时(上面的代码):

invoke = (parameters, id, port, host) => {
        var fs = require('fs');

        console.log(`Sending message in beats, host= ${host}, port= ${port}, message= ${parameters.message}`);

        var connectionOptions = {
            host: host,
            port: port
        };

        var client = lumberjack.client(connectionOptions, {rejectUnauthorized: false, maxQueueSize: 500});

        client.writeDataFrame({"line": id + " " + parameters.message});
    }

logstash 给我“beats 协议的无效版本:22”和“beats 协议的无效版本:3”:

Caused by: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 22
        at org.logstash.beats.Protocol.version(Protocol.java:22) ~[logstash-input-beats-6.0.11.jar:?]
        at org.logstash.beats.BeatsParser.decode(BeatsParser.java:62) ~[logstash-input-beats-6.0.11.jar:?]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        ... 9 more
[2020-08-11T07:49:47,954][INFO ][org.logstash.beats.BeatsHandler] [local: 172.22.0.40:9600, remote: 172.22.0.1:33766] Handling exception: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 3
[2020-08-11T07:49:47,955][WARN ][io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 3
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:404) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:371) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.channel.AbstractChannelHandlerContext.access$300(AbstractChannelHandlerContext.java:61) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.channel.AbstractChannelHandlerContext$4.run(AbstractChannelHandlerContext.java:253) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.49.Final.jar:4.1.49.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.logstash.beats.InvalidFrameProtocolException: Invalid version of beats protocol: 3
        at org.logstash.beats.Protocol.version(Protocol.java:22) ~[logstash-input-beats-6.0.11.jar:?]
        at org.logstash.beats.BeatsParser.decode(BeatsParser.java:62) ~[logstash-input-beats-6.0.11.jar:?]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) ~[netty-all-4.1.49.Final.jar:4.1.49.Final]
        ... 11 more
4

1 回答 1

0
  1. 您可以尝试使用 tcp 输入,而不是使用 beats 输入。

例子:

input {
  tcp {
    port => "9600"
    codec => "json"
  }
}
  1. 如果你使用beats输入,想要使用Logstash对Filebeat采集的数据进行额外处理,需要配置Filebeat使用Logstash。

为此,您可以编辑 Filebeat 配置文件以通过将其注释掉来禁用 Elasticsearch 输出,并通过取消注释 Logstash 部分来启用 Logstash 输出:

output.logstash:
     hosts: ["127.0.0.1:5044"]

您可以在https://www.elastic.co/guide/en/beats/filebeat/current/logstash-output.html上阅读更多内容

于 2021-05-02T16:33:21.833 回答