我在 S3 Kafka 连接器中遇到了问题,但在 JDBC 连接器中也看到了这个问题。我试图了解如何确保我的连接器实际上正在使用某个主题中的所有数据。我预计由于刷新大小,消息的消费可能会有一定的延迟(10/15 分钟),但我注意到我最终会有很大的延迟(几天......),我的消费者总是有一些东西在偏移量滞后
例如,我正在阅读/查看有关此的帖子/视频(主要是该评论):https ://rmoff.net/2020/12/08/twelve-days-of-smt-day-1-insertfield-timestamp/ https ://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc “flush.size 16 太低了,但如果太高你必须等待你的文件会出现在 S3 中,我等的很无聊。”
如果flush.size大于可用记录,它确实提到了这些记录可能需要一些时间来消耗,但我从没想过这会超过几分钟。如何确保所有记录都被使用,我真的很想避免 flush.size = 1
也许这只是我对接收器连接器的误解,但我确实希望它们能够作为普通消费者工作,所以我希望它们能够使用所有数据,并且这种刷新/批量大小将更多地基于超时工作以及性能问题。
如果有人感兴趣,这是我的连接器配置
对于 S3 接收器:
topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true
对于 JDBC 接收器:
topics.regex: com.custom.obj_(.*)
table.name.format: ${@PREFIX@}${topic}
batch.size: 200
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
connection.url: ${@DB_URL@}
connection.user: ${@DB_USER@}
connection.password: ${@DB_PASSWORD@}
auto.create: true
auto.evolve: true
db.timezone: ${@DB_TIMEZONE@}
quote.sql.identifiers: never
transforms: kafkaMetaData
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true
我已经读过这两个但仍然不确定: Kafka JDBC Sink Connector, insert values in batches https://github.com/confluentinc/kafka-connect-jdbc/issues/290
另外例如,我已经看到人们使用的示例(我认为这对我的用例没有帮助),但我想知道这个值是每个连接器定义的吗?我什至有点困惑,在文档中我总是找到没有消费者的配置。但我总是在消费者身上找到例子。所以我想这意味着这是一个适用于消费者和生产者的通用属性?
consumer.max.interval.ms: 300000
consumer.max.poll.records: 200
有人有好的反馈吗?