0

我是 Logstash 和 Avro 的初学者。我们正在建立一个系统,其中 logstash 作为 kafka 队列的生产者。但是,我们遇到了一个问题,即 Logstash 产生的 avro 序列化事件无法被 apache 提供的 avro-tools jar(版本 1.8.2)解码。此外,我们注意到 Logstash 和 avro-tools 的序列化输出不同。

我们有以下设置:

  • Logstash 5.5 版
  • logstash avro 编解码器版本 3.2.1
  • 卡夫卡版本 0.10.1
  • avro-tools jar 版本 1.8.2

例如,考虑以下架构:

{
"name" : "avroTestSchema",
"type" : "record",
"fields" : [ {
  "name" : "testfield1",
  "type" : "string"
  },
  {
  "name" : "testfield2",
  "type" : "string"
  }
]
}

和以下 json 字符串:

{"testfield1":"somestring","testfield2":"anotherstring"}

使用 Logstash 进行序列化时。Logstash 配置文件:

input {
  stdin {
    codec => json
  }
}

filter {
 mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output {
  kafka {
    bootstrap_servers => "localhost:9092"
    codec => avro {
      schema_uri => "/path/to/TestSchema.avsc"
    }
    topic_id => "avrotestout"
  }
  stdout {
    codec => rubydebug
  }
}

输出(使用猫):

FHNvbWVzdHJpbmcaYW5vdGhlcnN0cmluZw==  

使用 avro-tools 进行序列化时。命令:

java -jar avro-tools-1.8.2.jar jsontofrag --schema-file TestSchema.avsc message.json

输出

somestringanotherstring

命令:

java -jar avro-tools-1.8.2.jar fromjson --schema-file TestSchema.avsc message.json

输出:

Objavro.codenullavro.schema▒{"type":"record","name":"avroTestSchema","fields":[{"name":"testfield1","type":"string"},{"name":"testfield2","type":"string"}]}▒▒▒▒&70▒▒Hs▒U2somestringanotherstring▒▒▒▒&70▒▒Hs▒U

所以我们的问题是:我们如何配置 Logstash 以使输出与 apache avro-tools jar 兼容?

更新:我们发现 logstash 产生的 avro 输出是 base64 编码的。但是找不到发生这种情况的位置,以及如何使其与 avro-tools 兼容

4

1 回答 1

2

正如更新中提到的,我们发现标准的 Logstash Avro 编解码器在 avro 输出中添加了一个非可选的 base64 编码。我们发现这是不可取的。所以我们分叉了编解码器并使这种编码可配置。我们对此进行了测试,它在我们的几个系统上开箱即用。

分支在 github 上可用:https ://github.com/Rubyan/logstash-codec-avro

要设置(或取消设置)base64 编码,请将其添加到您的 logstash 配置文件中:

output {
     stdout {
        codec => avro {
            schema_uri => "schema.avsc"
            base64_encoding => false
        }
    }
}
于 2017-08-30T10:58:26.857 回答