0

我在 S3 Kafka 连接器中遇到了问题,但在 JDBC 连接器中也看到了这个问题。我试图了解如何确保我的连接器实际上正在使用某个主题中的所有数据。我预计由于刷新大小,消息的消费可能会有一定的延迟(10/15 分钟),但我注意到我最终会有很大的延迟(几天......),我的消费者总是有一些东西在偏移量滞后

例如,我正在阅读/查看有关此的帖子/视频(主要是该评论):https ://rmoff.net/2020/12/08/twelve-days-of-smt-day-1-insertfield-timestamp/ https ://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc “flush.size 16 太低了,但如果太高你必须等待你的文件会出现在 S3 中,我等的很无聊。”

如果flush.size大于可用记录,它确实提到了这些记录可能需要一些时间来消耗,但我从没想过这会超过几分钟。如何确保所有记录都被使用,我真的很想避免 flush.size = 1

也许这只是我对接收器连接器的误解,但我确实希望它们能够作为普通消费者工作,所以我希望它们能够使用所有数据,并且这种刷新/批量大小将更多地基于超时工作以及性能问题。

如果有人感兴趣,这是我的连接器配置

对于 S3 接收器:

topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

对于 JDBC 接收器:

topics.regex: com.custom.obj_(.*)
table.name.format: ${@PREFIX@}${topic}
batch.size: 200
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
connection.url: ${@DB_URL@}
connection.user: ${@DB_USER@}
connection.password: ${@DB_PASSWORD@}
auto.create: true
auto.evolve: true
db.timezone: ${@DB_TIMEZONE@}
quote.sql.identifiers: never
transforms: kafkaMetaData
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

我已经读过这两个但仍然不确定: Kafka JDBC Sink Connector, insert values in batches https://github.com/confluentinc/kafka-connect-jdbc/issues/290

另外例如,我已经看到人们使用的示例(我认为这对我的用例没有帮助),但我想知道这个值是每个连接器定义的吗?我什至有点困惑,在文档中我总是找到没有消费者的配置。但我总是在消费者身上找到例子。所以我想这意味着这是一个适用于消费者和生产者的通用属性?

consumer.max.interval.ms: 300000
consumer.max.poll.records: 200

有人有好的反馈吗?

4

1 回答 1

1

关于提供的 Kafka S3 接收器连接器配置:

topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type:org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

您可以调整配置字段以控制消耗\上传到 S3 的速率。从而减少您所看到的 Kafka 偏移中的滞后。在配置中为以下字段使用变量的最佳实践。

根据个人经验,您可以做的调整是:

  1. 调整flush.size

    flush.size: 800

这是(如你所说):

最大记录数:连接器的 flush.size 配置属性指定应写入单个 S3 对象的最大记录数。此设置没有默认值。

我更喜欢更大的文件并使用下面的时间调整来控制消耗。确保您的记录不会因flush.size * RECORD_SIZE而太大或太小而无法制作合理的文件。

  1. 调整 rotate.interval.ms

    rotate.interval.ms: (i would delete this field, see rotate.schedule explanation below)

这是:

记录时间的最大跨度:连接器的 rotate.interval.ms 指定文件可以保持打开状态并准备好接收其他记录的最大时间跨度(以毫秒为单位)。

  1. 添加字段rotate.schedule.interval.ms:

    rotate.schedule.interval.ms 60000

这是:

计划轮换:连接器的 rotate.schedule.interval.ms 指定文件可以保持打开状态并准备好接收其他记录的最大时间跨度(以毫秒为单位)。与rotate.interval.ms 不同,在预定轮换中,每个文件的时间戳从第一条记录写入文件的系统时间开始。只要在 rotate.schedule.interval.ms 指定的时间跨度内处理记录,记录就会写入文件。在当前文件的时间跨度之后处理记录后,立即刷新文件,上传到 S3,并提交文件中记录的偏移量。创建一个新文件,其时间跨度从当前系统时间开始,并将记录写入文件。提交将在预定时间执行,无论之前的提交时间或消息数量如何。当您必须根据当前服务器时间(例如在每小时开始时)提交数据时,此配置很有用。默认值 -1 表示禁用此功能。

您使用默认值 -1,这意味着禁用此旋转。此调整将产生最大的不同,因为每个任务将更频繁地消耗。

关于问题的第二部分:

您可以通过向您的 kafka 添加指标并使用例如 prometheus 和 grafana 进行连接来获得可观察性。来源中的配置指南如下。

指标

资料来源:

连接 S3 接收器

kafka-monitoring-via-prometheus

连接 S3 Sink 配置文档

于 2021-01-25T10:29:26.640 回答