问题标签 [s3-kafka-connector]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
146 浏览

parquet - 在Kafka连接S3连接器中使用Null内部数组处理消息

我正在使用带有 2 个连接器的 Kafka 连接:

  1. debezium 将数据从 Postgres 提取到 Kafka
  2. 用于将数据从 Kafka 保存到 S3 的 S3 连接器

运行时,我从 S3 连接器收到此错误

我找到了相关消息,其中包含以下内容:

如何处理此消息?

我尝试将以下内容添加到 S3 连接器配置中:

尝试跳过这些消息并将它们发送到 DLQ,但它似乎不起作用,并且任务因此错误而失败。我也在日志中看到了这一点:

但不确定我应该在哪里添加这个?作为连接器配置的一部分?

0 投票
1 回答
114 浏览

amazon-s3 - Confluent Kafka-to-S3 sink 自定义 s3 命​​名,便于分区

我正在使用 confluent 的 kafka-connect-s3 https://www.confluent.io/hub/confluentinc/kafka-connect-s3将我的 kafka 主题备份到 s3 。我希望能够使用 Athena 轻松查询这些数据,并将其正确分区以实现廉价/快速读取。

我想按(年/月/日/主题)元组进行分区。我已经通过使用每日分区器https://docs.confluent.io/kafka-connect-s3-sink/current/index.html#partitioning-records-into-s3-objects解决了年/月/日部分。现在 year=YYYY/month=MM/day=DD 已进入路径,因此任何基于配置单元的查询都会按时优化/分区。查看 msck 解释,注意使用示例userid=

https://docs.aws.amazon.com/athena/latest/ug/msck-repair-table.html

但是,根据这些文档https://docs.confluent.io/kafka-connect-s3-sink/current/index.html#s3-object-names我在路径中得到 {topic} 但无法修改它到主题={主题}。我可以将其添加到前缀中(而不是 env={env},前缀将是 env={env}/topic={topic}),但这对于它下面的另一个唯一子目录 {topic} 来说似乎是多余的。

我还注意到主题名称在由 + 分隔的消息名称中(以及分区和起始偏移量)。

我的问题 。. . 如何在我的路径中获取 topic={topic} 以便基于配置单元的查询自动创建该分区?或者我是否已经通过在路径中(没有主题=)或消息名称(再次,没有主题=)中免费获得它

0 投票
1 回答
341 浏览

apache-kafka - Confluent S3 sink 连接器中的解析问题 [序列化错误]

我正在使用带有融合 kafka 代码的融合 s3 接收器连接器,用于基本 kafka-connect(v5.2.1)。

最初,MySQL cdc 以 JSON 格式(使用 maxwell)写入 kafka 主题(未写入模式)。这个 kafka 连接器从上面的 apache kafka 集群中读取数据并将其写入 s3。

我正在使用具有以下连接器配置的分布式连接器:

}

虽然这通常有效,但我一直看到这个错误。

在 Jackson 中出现解析错误。此数据来自 MySQL 的 cdc 事件。所以不可能改变这些数据(没有破坏性的改变)。如果 kafka 正在接受 json 数据,那么它对于 kafka-connector 也应该是合法的。

0 投票
1 回答
412 浏览

json - Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保留在 S3 中。

背景

官方S3-Sink-Connector支持 Parquet 输出格式,但是:

对于此连接器,您必须将 AvroConverter、ProtobufConverter 或 JsonSchemaConverter 与 ParquetFormat 一起使用。尝试使用 JsonConverter(带或不带模式)会导致 NullPointerException 和 StackOverflowException。

如果消息不是使用 JSON Schema serialization 写入的,则 JsonSchemaConverter 会抛出错误

问题陈述

因此,我正在寻找一种方法来从最初以 JSON 格式编写的 Kafka 主题读取消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入将以 Parquet 格式写入 S3 的 S3 连接器。

或者,鉴于主要要求(获取 Kafka 消息,将其作为 Parquet 文件放入 S3),我也愿意接受替代解决方案(-不涉及编写 JAVA 代码)。谢谢!

PS:不幸的是,改变这些 Kafka 消息最初的编写方式(例如使用带有Schema Discovery的JSON Schema 序列化)目前不是我的选择。

0 投票
0 回答
19 浏览

apache-kafka - 最佳 Kakfa Connect 每小时 S3AvroSink 配置

该主题的数量约为 739,180,大小为 1.1Gb

我不完全确定我的配置是否完全正确,如果我可以改进它。我想在两种情况下刷新,每小时或大小达到 5gb。

0 投票
0 回答
76 浏览

apache-kafka - Kafka & Connect - 如何修复 AVRO Schema 数据类型

设置

多个独立的源系统将 AVRO 事件推送到 Kafka 主题中。Kafka S3 接收器连接器从该主题读取 AVRO 事件并写入 S3 parquet 格式。

问题

我们模式注册表中的 AVRO 模式不符合标准。例如,源系统中的十进制字段在模式注册表中具有基本类型字符串和逻辑类型十进制。这些类型的组合在 AVRO 中是不允许的(十进制逻辑类型必须始终具有基本类型修复/字节。

这些不正确的 AVRO 模式会导致不正确的 PARQUET 文件模式。例如,在镶木地板中,十进制字段的类型为字符串,并且丢失了有关其十进制格式的所有详细信息。

问题

在模式注册表中拥有正确 AVRO 类型的最佳解决方案是什么?我们无法更新源系统以发送正确的类型。

我们是否应该使用带有自定义逻辑的 SMT 来处理逻辑类型?例如,通过搜索十进制逻辑类型并相应地更改基本类型和值?还是应该使用 KStream 或自定义序列化器/反序列化器而不是 SMT?还有哪些其他可用选项?

0 投票
1 回答
166 浏览

apache-kafka - Confluent Kafka S3 sink 连接器在使用 Parquet 格式时抛出`java.lang.NoClassDefFoundError: com/google/common/base/Preconditions`

使用 Confluent S3 sink 连接器时,会发生以下错误:

这发生在 5.5、10.0.0 和 10.0.1 上。

它只发生在 Parquet 上,而 Arvo 工作正常。

日志显示分区程序和源数据格式工作正常。

该连接器是从 Confluent 网站手动下载的。

0 投票
2 回答
74 浏览

confluent-platform - Confluent connect 5.5.1 抛出异常:线程 kafka-coordinator-heartbeat-thread 中的 java.lang.OutOfMemoryError UncaughtExceptionHandler |

我有一个包含多个子集群的大型 Confluent Kafka 集群,一个用于 Zookeeper,另一个用于具有 Schema Registry 和 KSQL 流的 Kafka 代理,一个用于 Connect。

我的连接集群出现问题,因为我已根据此处的文章将所有工作实例的 rest.advertised.host.name 配置为 FQDN -

以下是我在所有节点上的连接分布式日志文件中不断看到的错误 -

connectDistributed.out

错误 1-

错误 2-

以下是连接工作人员属性 -

我确信每个工人都有 6GB 分配给它 -

查看过程跟踪 -

请帮助如何解决这个问题?

0 投票
0 回答
40 浏览

confluent-platform - 如何在融合 6.2.0 中运行 kafka s3 sink 连接器

在我的 3 个 kafka 节点中安装了confluent 6.2.0 ,还在 3 个节点中安装了confluentinc-kafka-connect-s3-10.0.1并修改了quickstart-s3.properties但不知道如何启动它...

0 投票
0 回答
207 浏览

amazon-s3 - kafka,s3-sink 连接器无法获取架构版本 ID - 用户无权执行:glue:GetSchemaVersion

当我尝试使用 debezium 将数据加载到 Kafka 主题中时,我使用 debezium 为 CDC 设置了 Kafka、Kafka-connect 集群和胶水模式注册表,它工作正常,但接收器连接器失败,并出现以下插入错误从 Kafka 主题到 S3 存储桶的数据。

例如:连接器配置:

我收到以下错误:

错误转换主题“second_mongo_conn.chakra_dev.xiomileads”分区 0 中偏移量 0 和时间戳 1632977820814 中的消息值:由于序列化错误,将字节 [] 转换为 Kafka Connect 数据失败:(org.apache.kafka.connect.runtime.WorkerSinkTask:542 ) org.apache.kafka.connect.errors.DataException:由于序列化错误,将字节 [] 转换为 Kafka Connect 数据失败:在 com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:165)

.. .. 引起:com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException:无法在 com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionResponse( AWSSchemaRegistryClient.java:226) .. .. 引起:software.amazon.awssdk.services.glue.model.AccessDeniedException:用户:arn:aws:sts::409503125457:assumed-role/smtp_role/i-0e4e544391277a376 未授权执行:glue:GetSchemaVersion(服务:Glue,状态代码:400,请求 ID:7fe5f384-f9a7-442c-824b-00761e729730,扩展请求 ID:null)