问题标签 [s3-kafka-connector]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
245 浏览

apache-kafka-connect - Camel aws-s3 Source Connector Error - 应该如何更改配置

我正在使用我们的 Confluent (5.5.1) 安装定义 Camel S3 Source 连接器。创建连接器并将状态检查为“正在运行”后,我将文件上传到我的 S3 存储桶。即使我对存储桶进行 ls 操作,它也是空的,这表明该文件已被处理和删除。但是,我没有看到该主题中的消息。我基本上按照这个示例尝试一个简单的 4 行文件,而不是独立的 kafka,而是在一个融合集群上执行它。

这是我的配置

我在日志中看到了这些错误

如果我对连接器的状态进行 curl 请求,我会收到此状态错误

我在几个链接中看到了以下解决方案,但这也没有帮助。它建议在配置中添加以下键

谢谢

更新

我将配置削减到最小,但仍然得到相同的错误

仍然得到同样的错误

不知道我还应该在哪里寻找根本原因

0 投票
1 回答
312 浏览

apache-kafka - Kafka 连接路径格式无法正常工作

我使用下面的脚本创建连接器,但在 S3 中,我看到 /year=2015/month=12/day=07/hour=15/ 的分区格式。有没有办法实现 'dt'=YYYY-MM-dd/'hour'=HH/ 格式的分区?

0 投票
1 回答
1339 浏览

apache-kafka - S3 Kafka 接收器连接器中是否有办法确保所有记录都被使用

我在 S3 Kafka 连接器中遇到了问题,但在 JDBC 连接器中也看到了这个问题。我试图了解如何确保我的连接器实际上正在使用某个主题中的所有数据。我预计由于刷新大小,消息的消费可能会有一定的延迟(10/15 分钟),但我注意到我最终会有很大的延迟(几天......),我的消费者总是有一些东西在偏移量滞后

例如,我正在阅读/查看有关此的帖子/视频(主要是该评论):https ://rmoff.net/2020/12/08/twelve-days-of-smt-day-1-insertfield-timestamp/ https ://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc “flush.size 16 太低了,但如果太高你必须等待你的文件会出现在 S3 中,我等的很无聊。”

如果flush.size大于可用记录,它确实提到了这些记录可能需要一些时间来消耗,但我从没想过这会超过几分钟。如何确保所有记录都被使用,我真的很想避免 flush.size = 1

也许这只是我对接收器连接器的误解,但我确实希望它们能够作为普通消费者工作,所以我希望它们能够使用所有数据,并且这种刷新/批量大小将更多地基于超时工作以及性能问题。

如果有人感兴趣,这是我的连接器配置

对于 S3 接收器:

对于 JDBC 接收器:

我已经读过这两个但仍然不确定: Kafka JDBC Sink Connector, insert values in batches https://github.com/confluentinc/kafka-connect-jdbc/issues/290

另外例如,我已经看到人们使用的示例(我认为这对我的用例没有帮助),但我想知道这个值是每个连接器定义的吗?我什至有点困惑,在文档中我总是找到没有消费者的配置。但我总是在消费者身上找到例子。所以我想这意味着这是一个适用于消费者和生产者的通用属性?

有人有好的反馈吗?

0 投票
1 回答
98 浏览

apache-kafka-connect - S3 连接器与 HourlyPartitioner 失败

当我们尝试使用默认配置通过 S3 接收器连接器写入 S3 时,工作正常,没有任何问题。但是当我们尝试每小时分区失败并出现以下错误时。请找到代码和错误消息并帮助我们解决此问题

默认 :

每小时:

错误 :

在此处输入图像描述

0 投票
1 回答
136 浏览

apache-kafka - 为什么我的 Kafka S3 连接器接收器在创建后会删除我的主题(Kafka 连接器重新启动)?

我正在使用 Confluent 的 Kafka 连接器将数据从 Kafka 接收到 MinIO 存储桶。我在io.confluent.connect.s3.S3SinkConnectorKubernetes 环境中使用。这是我当前的 S3 Sink 配置:

云环境部署后,客户希望能够动态控制主题(即随意添加和删除主题)。虽然我明白为什么这可能不理想,但我还是屈服于上级。

因此,为了执行主题添加,我使用了 Kafka REST API:

我知道代码并不漂亮,但它现在可以完成工作。/connectors/my-sink/config它本质上向端点发出了一个 PUT 请求,并附加了我的新主题。

这行得通。我的接收器有新主题,我可以发送消息。

但是,在 3-5 分钟内,我的 Kafka Sink Pod 开始重新启动(我认为)Kafka 连接器:

此时 pod 将失去主题。

我相信这是由config.action.reload = restart配置值引起的。我认为在收到新配置后,连接器将在 N 分钟后重新启动。但是,我似乎找不到任何关于如何改变这种行为的文档。也许我应该在我的 PUT 请求期间这样做,但这感觉很糟糕。这也只是在黑暗中的一个镜头。

有谁知道为什么我的连接器在更新配置的 PUT 请求后重新启动?有没有办法防止这种情况发生?

编辑#1:我试图添加config.action.reload = none,但是连接器仍然重新启动。

我查看了 Kafka Operator 中的日志,并没有触发重置。似乎与 Kafka 连接器操作员完全隔离。

0 投票
0 回答
167 浏览

apache-spark - 使用 kafka 连接器读取 Kafka 主题并写入 s3

我有以下数据框架构

我正在将上述数据框成功写入 kafka 主题(s3.topic)。接下来我想从 kafka 读取 s3.topic 并将数据存储在 s3 存储桶中。为此,我正在使用 kafka 连接器。我在 connect-distributed.properties 文件中进行了以下配置更改

以下是我用来写入 s3 存储桶的 curl 命令

我收到以下错误

错误 WorkerSinkTask{id=S3SinkConnector904-0} 在偏移量 0 和时间戳 1617585788233 的主题“emoji.analysis009”分区 0 中转换消息值时出错:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。(org.apache.kafka.connect.runtime.WorkerSinkTask:547) org.apache.kafka.connect.errors.DataException:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。在 org.apache.kafka 的 org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370)。

如何使用上面显示的 printschema 将我的主题数据写入 s3 存储桶?

更新

以下是写入 kafka 主题的数据帧的 printSchema() 输出

0 投票
2 回答
633 浏览

apache-kafka - 无法在kafka connect中使用接收器连接器

我正在尝试在 kafka connect 中使用 S3 sink 连接器,它会启动并稍后失败。

我的配置看起来像:

我的 connect-distributed.properties 看起来像:

完整的错误日志:

消息类型:

在此处输入图像描述

新的错误日志: 在此处输入图像描述

0 投票
1 回答
39 浏览

amazon-s3 - confluent s3 源连接器如何知道它已经摄取了哪些文件以及哪些文件是新的?

https://docs.confluent.io/kafka-connect-s3-source/current/

我认为这个连接器会轮询 s3 以获取文件列表——但它是否会保留有关已处理哪些文件以及哪些是新文件的状态?如果它确实存储状态,那么状态存储在哪里?

0 投票
0 回答
59 浏览

apache-kafka - 如何在使用 S3 sink 连接器时增加现有主题的分区

我正在使用 S3 sink 连接器和 kafka connect 并尝试将数据加载到 s3 。不知何故,我无法更新或增加主题分区的大小,也无法更改 offset.flush.timeout.ms 值。我正在尝试将其添加到我正在使用的 S3 连接器 curl 文件中,但没有任何更新。

0 投票
0 回答
26 浏览

apache-kafka-connect - Kakfa 源 Postgres 连接器显示为接收器连接器

作为源创建的 PostgresConnector 失败,并且在 Kafka Connect UI 上也显示为接收器连接器。我正在为 Kafka 连接使用基于 Kubernetes 的解决方案。以下是连接器的配置 -