1

我正在做 confluent kafka connect 5.2.3 版的 poc。我们正在尝试将主题的消息复制到一个文件作为备份,并在需要时从该文件中复制回主题。

主题有 Key =string Value=protbuf

我在用

key.convertor=org.apache.kafka.connect.storgare.StringConvertor value.convertor=com.blueapron.connect.protobuf.ProtobufConvertor value.convertor.protoClassName=<proto class name>

接收器配置

name=test
connector.class=FileStreamSink
tasks.max=1
file=test.txt
topics=testtopic

源配置

name=test
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topics=testtopic_connect

我能够成功地将其下沉到文件内容如下的文件中

Struct{<message in name value pair>}
Struct{<message in name value pair>}

……

我用同一个文件将它返回到不同的主题。当我运行源它抛出错误

字符串不能转换为 org.apache.kafka.connect.data.Struct。

问题是

  • 当我的 kafka 主题具有键值对时,为什么我在文件中看不到任何键。
  • 为什么源无法将内容从文件复制到主题并引发与投射相关的错误。
  • 当我使用 kafka 提供的 ByteArrayConvertor 时,我得到了类似的错误。字符串不能转换为字节。理想情况下 ByteArrayConvertor 应该适用于任何类型的数据。
  • blueapron 是否仅适用于 protobuf3 版本?
4

2 回答 2

0

blueapron 是否仅适用于 protobuf3 版本?

是的,因为我们只在内部使用 proto3(我们需要 Python、Java 和 Ruby 代码生成 - Ruby 仅适用于 proto3),不幸的是我们在第一次构建这个工具时没有构建对 proto2 定义的支持。我已经更新了项目自述文件以反映这一点(https://github.com/blueapron/kafka-connect-protobuf-converter#compatibility)。

于 2019-12-30T19:19:04.607 回答
0

当我的 kafka 主题具有键值对时,为什么我在文件中看不到任何键

FileSink 不写入键,只写入值,通过.toString对该数据执行结构或非结构。有一个 SMT 可以将密钥移入值中 - https://github.com/jcustenborder/kafka-connect-transform-archive

但文件的行仍然看起来像Struct{key= ..., value=...}

为什么源无法将内容从文件复制到主题并引发与投射相关的错误。

当我使用 kafka 提供的 ByteArrayConvertor 时,我得到了类似的错误。字符串不能转换为字节。理想情况下 ByteArrayConvertor 应该适用于任何类型的数据。

FileSource 仅从文件中读取以行分隔的字段作为字符串。

blueapron 是否仅适用于 protobuf3 版本?

好像是这样 - https://github.com/blueapron/kafka-connect-protobuf-converter/blob/v2.2.1/pom.xml#L21


本质上,如果您想完全备份一个主题(包括时间戳、标题,甚至可能是偏移量),您将需要转储和读取实际二进制数据的东西(这不是 ByteArrayConverter 所做的,它只会(反)序列化数据从/到 Kafka,而不是源/接收器,并且 FileSource 连接器将尝试始终将文件作为字符串读回

于 2019-10-14T23:08:52.163 回答