问题标签 [s3-kafka-connector]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
kubernetes - Apache 中的日期时间替换 - Camel Kubernetes Yaml 文件
此 Kubernetes 部署文件中使用的DateTime 占位符(${date:now:yyyyMMdd-HHmmssSSS}未解析。而 ${file:} 已解析。
我在本文档中看到了 ${date:now:yyyyMMdd} 和 exchangeId ( https://camel.apache.org/components/latest/languages/simple-language.html )
apache-spark - Spark 无法读取 AvroParquetWriter 编写的 Parquet 文件中的 DECIMAL 列
我有一些使用 AvroParquetWriter(来自 Kafka Connect S3 连接器)编写的 Parquet 文件。
文件中的一列aseg_lat
有一个 schema DECIMAL(9, 7)
。
我可以使用 PyArrow 和 PrestoSQL 很好地阅读该专栏。
尝试通过在 AWS EMR 上运行的 Spark 3.0.0 读取它,我收到以下错误:
我还尝试通过将 Hive SerDe 设置spark.sql.hive.convertMetastoreParquet
为false
. 这使我可以阅读该DECIMAL
列,但对于时间戳等其他列开始失败。
另一个观察结果是,更改DECIMAL(9, 7)
为DECIMAL(x, 7)
(其中 x > 19)允许 Spark 读取列,但这对我来说不是一个可行的解决方案,因为我有多个 TB 的历史数据写入DECIMAL(9, 7)
需要重新处理。
如何阅读SparkDECIMAL
所写的内容?AvroParquetWriter
apache-kafka - kafka connect s3-source connector的压缩文件保存在哪里
我已经下载了 confluent 网页中给出的 s3-source 连接器 zip 文件。我没有找到放置提取文件的位置。我收到以下错误
请指导我。要加载连接器,我使用此命令 -
amazon-web-services - 如何修改使用 Kafka Connect S3 连接器上传的 S3 对象的文件名?
我已经使用 S3 连接器几个星期了,我想更改连接器命名每个文件的方式。我正在使用 HourlyBasedPartition,因此每个文件的路径已经足以让我找到每个文件,并且我希望文件名对于所有文件都是通用的,例如“Data.json.gzip”(具有相应的路径从分区器)。
例如,我想从这个开始:
对此:
这样做的目的是只调用一次 S3 以稍后下载文件,而不必先查找文件名然后下载。
从名为“kafka-connect-s3”的文件夹中搜索文件,我找到了这个文件: https ://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src /main/java/io/confluent/connect/s3/TopicPartitionWriter.java最后具有以下一些功能:
我不知道这是否可以根据我想要做的定制,但似乎与我的意图接近/相关。希望能帮助到你。
(也向 Github 提交了一个问题:https ://github.com/confluentinc/kafka-connect-storage-cloud/issues/369 )
amazon-s3 - Kafka-Connect:启动 S3 Sink 连接器时出现无法识别的错误
我正在尝试为具有 3 个节点的 Kafka Connect 集群设置我的第三个工作人员。工作人员在这第三个节点上正常运行,我能够进行 REST 调用以获取现有连接器(现在我有 2 个,每个节点上一个)。但是,当我尝试使用以下命令进行 POST 调用以创建第三个连接器时:
我得到这个 TimeoutException 响应:
当我查看工作人员堆栈跟踪时,它会显示以下内容:
跟踪的第一个日志是困扰我的,因为我没有看到任何有关我做错了什么的相关信息,而第二个日志只是 TimeoutException。我到处寻找有类似问题的人,或者了解“AbstractConfig”类但找不到任何有用的东西,这是来自 Kafka 的 AbstractConfig 类(我使用的是 Kafka 版本 2.0.0)。
最后,这是我正在使用的配置文件:
如果您觉得我应该包含任何其他信息,请随时询问,我对堆栈溢出相当陌生。
我很想知道是否有人遇到过这样的事情,或者是否有人知道是什么导致了这里的问题。谢谢!
apache-kafka - Kafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区
过去几周,我们一直致力于将 Kafka Connect 添加到我们的数据平台,并认为这将是一种将 Kafka 中的数据提取到 S3 数据湖中的有用方法。我们已经使用 FieldPartitioner 和 TimeBasePartitioner 并看到了一些相当不错的结果。
我们还需要按用户 ID 进行分区 - 但是尝试在用户 ID 字段上使用 FieldPartitioner 后,连接器非常慢 - 尤其是与按日期分区等相比。我知道按 ID 分区会创建很多输出分区因此不会那么快——这很好,但它需要能够跟上生产者的步伐。
到目前为止,我们已经尝试增加内存和堆 - 但我们通常不会看到任何内存问题,除非我们将 flush.size 增加到一个很大的数字。我们还尝试了小刷新大小、非常小和大的 rotate.schedule.interval.ms 配置。我们还研究了网络,但这似乎很好 - 使用其他分区器网络保持良好。
在可能浪费大量时间之前,是否有人尝试或成功通过 id 字段进行分区,尤其是在较大的主题上,使用 S3 Sink 连接器?或者有没有人在配置或设置方面有任何建议,可能是一个不错的地方?
apache-kafka - Kafka Sink 连接器能否将记录时间戳作为存储在存储中的有效负载包含在内
我同时使用 S3 和 JDBC 接收器连接器,并且在存储数据时遇到了一些奇怪的行为。为了进行一些协调,我真的很想将 Kafka 摄取时间或记录生成时间保留到存储在 Sink 系统中的数据中。
我正在查看文档,但没有找到。我正在使用 Confluent 连接器,但如果允许我这样做,我也可以使用其他连接器,例如 Camel。
有人可以给我一些指示吗?
更新:根据 onecricketeer 的良好反馈,我知道我应该看看这个: https ://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield
而且我还看到了这个例子: Kafka连接消费者引用偏移量并存储在消息中
我会测试它,但我是否理解正确,例如理论上我可以做这样的事情:
这将在记录中为我创建 3 个新属性,称为 recordOffset、recordPartition 和 recordTimestamp,其中包含所描述的值。
如果我想确保这些值总是存在或失败,我需要做(不确定我是否理解后缀部分):
apache-kafka - 定义 Camel S3 源连接器时出错
我正在尝试在我们的融合环境中定义一个 Camel S3 源连接器。
这是我正在使用的配置
这是我收到的错误,当我尝试定义连接器时
上述错误的原因是什么?
有人告诉我,由于连接服务器是一个 ec2 实例,我不必在这里定义 AWS 参数。那是对的吗?
谢谢
注意:想要补充一下,有两个连接服务器,并且输出中仅显示其中一个错误
amazon-s3 - Camel Kafka S3 源连接器,具有用于同一存储桶的多个连接器
我正在尝试定义 Camel S3 Source 连接器。我进行了相当多的搜索,但没有成功找到以下问题的答案。
- 如何设置我的连接器,以便 S3 存储桶中的文件在处理后不会被删除,但可以移动到我指定的另一个文件夹
- 是否可以为同一个存储桶定义具有不同值转换器的单独连接器,也许是通过存储桶下的文件夹?连接器将根据文件类型使用不同的 kafka 主题。定义连接器属性时如何定义带有文件夹的存储桶
谢谢
apache-kafka - 运行 Kafka Connect S3 源连接器的 ClassNotFound 异常
我正在评估 Confluent Kafka S2 Source Connector 并遇到以下堆栈跟踪的问题:
连接器配置:
版本:
它可能是一个 JDK 错误:https ://bugs.openjdk.java.net/browse/JDK-8015099 。它已在 JDK 9+ 中修复。
Confluent docker 镜像confluentinc/cp-kafka-connect:5.2.4
使用 JDK8:
关于可能出错的任何其他想法?