问题标签 [s3-kafka-connector]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
117 浏览

kubernetes - Apache 中的日期时间替换 - Camel Kubernetes Yaml 文件

我想使用参考文档https://ibm-cloud-architecture.github.io/refarch-eda/scenarios/connect-s3/在 kubernetes 中部署 S3 kafka sink 连接器(使用 Apache Camel)

此 Kubernetes 部署文件中使用的DateTime 占位符(${date:now:yyyyMMdd-HHmmssSSS}未解析。而 ${file:} 已解析。

我在本文档中看到了 ${date:now:yyyyMMdd} 和 exchangeId ( https://camel.apache.org/components/latest/languages/simple-language.html )

0 投票
1 回答
1059 浏览

apache-spark - Spark 无法读取 AvroParquetWriter 编写的 Parquet 文件中的 DECIMAL 列

我有一些使用 AvroParquetWriter(来自 Kafka Connect S3 连接器)编写的 Parquet 文件。

文件中的一列aseg_lat有一个 schema DECIMAL(9, 7)

我可以使用 PyArrow 和 PrestoSQL 很好地阅读该专栏。

尝试通过在 AWS EMR 上运行的 Spark 3.0.0 读取它,我收到以下错误:

我还尝试通过将 Hive SerDe 设置spark.sql.hive.convertMetastoreParquetfalse. 这使我可以阅读该DECIMAL列,但对于时间戳等其他列开始失败。

另一个观察结果是,更改DECIMAL(9, 7)DECIMAL(x, 7)(其中 x > 19)允许 Spark 读取列,但这对我来说不是一个可行的解决方案,因为我有多个 TB 的历史数据写入DECIMAL(9, 7)需要重新处理。

如何阅读SparkDECIMAL所写的内容?AvroParquetWriter

0 投票
1 回答
83 浏览

apache-kafka - kafka connect s3-source connector的压缩文件保存在哪里

我已经下载了 confluent 网页中给出的 s3-source 连接器 zip 文件。我没有找到放置提取文件的位置。我收到以下错误

错误图像

请指导我。要加载连接器,我使用此命令 -

0 投票
0 回答
418 浏览

amazon-web-services - 如何修改使用 Kafka Connect S3 连接器上传的 S3 对象的文件名?

我已经使用 S3 连接器几个星期了,我想更改连接器命名每个文件的方式。我正在使用 HourlyBasedPartition,因此每个文件的路径已经足以让我找到每个文件,并且我希望文件名对于所有文件都是通用的,例如“Data.json.gzip”(具有相应的路径从分区器)。

例如,我想从这个开始:

对此:

这样做的目的是只调用一次 S3 以稍后下载文件,而不必先查找文件名然后下载。

从名为“kafka-connect-s3”的文件夹中搜索文件,我找到了这个文件: https ://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src /main/java/io/confluent/connect/s3/TopicPartitionWriter.java最后具有以下一些功能:

我不知道这是否可以根据我想要做的定制,但似乎与我的意图接近/相关。希望能帮助到你。

(也向 Github 提交了一个问题:https ://github.com/confluentinc/kafka-connect-storage-cloud/issues/369 )

0 投票
1 回答
611 浏览

amazon-s3 - Kafka-Connect:启动 S3 Sink 连接器时出现无法识别的错误

我正在尝试为具有 3 个节点的 Kafka Connect 集群设置我的第三个工作人员。工作人员在这第三个节点上正常运行,我能够进行 REST 调用以获取现有连接器(现在我有 2 个,每个节点上一个)。但是,当我尝试使用以下命令进行 POST 调用以创建第三个连接器时:

我得到这个 TimeoutException 响应:

当我查看工作人员堆栈跟踪时,它会显示以下内容:

跟踪的第一个日志是困扰我的,因为我没有看到任何有关我做错了什么的相关信息,而第二个日志只是 TimeoutException。我到处寻找有类似问题的人,或者了解“AbstractConfig”类但找不到任何有用的东西,这是来自 Kafka 的 AbstractConfig 类(我使用的是 Kafka 版本 2.0.0)。

最后,这是我正在使用的配置文件:

如果您觉得我应该包含任何其他信息,请随时询问,我对堆栈溢出相当陌生。

我很想知道是否有人遇到过这样的事情,或者是否有人知道是什么导致了这里的问题。谢谢!

0 投票
1 回答
467 浏览

apache-kafka - Kafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区

过去几周,我们一直致力于将 Kafka Connect 添加到我们的数据平台,并认为这将是一种将 Kafka 中的数据提取到 S3 数据湖中的有用方法。我们已经使用 FieldPartitioner 和 TimeBasePartitioner 并看到了一些相当不错的结果。

我们还需要按用户 ID 进行分区 - 但是尝试在用户 ID 字段上使用 FieldPartitioner 后,连接器非常慢 - 尤其是与按日期分区等相比。我知道按 ID 分区会创建很多输出分区因此不会那么快——这很好,但它需要能够跟上生产者的步伐。

到目前为止,我们已经尝试增加内存和堆 - 但我们通常不会看到任何内存问题,除非我们将 flush.size 增加到一个很大的数字。我们还尝试了小刷新大小、非常小和大的 rotate.schedule.interval.ms 配置。我们还研究了网络,但这似乎很好 - 使用其他分区器网络保持良好。

在可能浪费大量时间之前,是否有人尝试或成功通过 id 字段进行分区,尤其是在较大的主题上,使用 S3 Sink 连接器?或者有没有人在配置或设置方面有任何建议,可能是一个不错的地方?

0 投票
1 回答
805 浏览

apache-kafka - Kafka Sink 连接器能否将记录时间戳作为存储在存储中的有效负载包含在内

我同时使用 S3 和 JDBC 接收器连接器,并且在存储数据时遇到了一些奇怪的行为。为了进行一些协调,我真的很想将 Kafka 摄取时间或记录生成时间保留到存储在 Sink 系统中的数据中。

我正在查看文档,但没有找到。我正在使用 Confluent 连接器,但如果允许我这样做,我也可以使用其他连接器,例如 Camel。

有人可以给我一些指示吗?

更新:根据 onecricketeer 的良好反馈,我知道我应该看看这个: https ://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield

而且我还看到了这个例子: Kafka连接消费者引用偏移量并存储在消息中

我会测试它,但我是否理解正确,例如理论上我可以做这样的事情:

这将在记录中为我创建 3 个新属性,称为 recordOffset、recordPartition 和 recordTimestamp,其中包含所描述的值。

如果我想确保这些值总是存在或失败,我需要做(不确定我是否理解后缀部分):

0 投票
1 回答
122 浏览

apache-kafka - 定义 Camel S3 源连接器时出错

我正在尝试在我们的融合环境中定义一个 Camel S3 源连接器。

这是我正在使用的配置

这是我收到的错误,当我尝试定义连接器时

上述错误的原因是什么?

有人告诉我,由于连接服务器是一个 ec2 实例,我不必在这里定义 AWS 参数。那是对的吗?

谢谢

注意:想要补充一下,有两个连接服务器,并且输出中仅显示其中一个错误

0 投票
0 回答
90 浏览

amazon-s3 - Camel Kafka S3 源连接器,具有用于同一存储桶的多个连接器

我正在尝试定义 Camel S3 Source 连接器。我进行了相当多的搜索,但没有成功找到以下问题的答案。

  • 如何设置我的连接器,以便 S3 存储桶中的文件在处理后不会被删除,但可以移动到我指定的另一个文件夹
  • 是否可以为同一个存储桶定义具有不同值转换器的单独连接器,也许是通过存储桶下的文件夹?连接器将根据文件类型使用不同的 kafka 主题。定义连接器属性时如何定义带有文件夹的存储桶

谢谢

0 投票
1 回答
229 浏览

apache-kafka - 运行 Kafka Connect S3 源连接器的 ClassNotFound 异常

我正在评估 Confluent Kafka S2 Source Connector 并遇到以下堆栈跟踪的问题:

连接器配置:

版本:

它可能是一个 JDK 错误:https ://bugs.openjdk.java.net/browse/JDK-8015099 。它已在 JDK 9+ 中修复。

Confluent docker 镜像confluentinc/cp-kafka-connect:5.2.4使用 JDK8:

关于可能出错的任何其他想法?