问题标签 [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1716 浏览

apache-kafka - 分布式 Kafka Connect 主题配置

我遇到的问题是我的 Kafka Connect 工作程序配置在节点重新启动后丢失。( http://broker:port/connectors/ -> 空数组)

现在我认为它可能与“retention.ms”配置有关。因为connect config也存储在“config.storage.topic”中,在“retention.ms”后会被删除?所以我必须设置一个非常高的“retention.ms”。这是正确的还是由 Kafka 自动管理的?(如果您自己创建主题)

其他两个主题怎么样: status.storage.topic - 只有当前状态信息,不是那么重要?offset.storage.topic

0 投票
0 回答
613 浏览

hive - 当数据从 oracle 传输到 Hive 时,confluent 格式会发生变化

我在使用 confluent 时遇到问题,我创建了一个以 ID 作为增量列的 jdbc 源,以及一个 HDFS 接收器来将数据写入 Hive。

毕竟,我使用Hive浏览Hive中的数据,我可以看到Oracle中为“int”的ID列在Hive中变成了“binary”,而Oracle中为“date”的时间列是在 Hive 中变成了“时间戳”。

以下是模式寄存器的详细信息:

这是我的 schema-registry.properties:</p>

这是我的 connect-avro-distributed.properties:

这是怎么发生的?我该如何解决这个问题?这是模式注册表配置的问题吗?

此致,

飞然

0 投票
1 回答
20589 浏览

apache-kafka - 如何使用 kafka0.10.x 获取所有组列表

当我使用 kafka0.8.x 时,我可以通过路径获取所有带有 zookeeper 的组列表/consumers/group_id,因为偏移量将在 0.8.x 中提交给 zookeeper。

但是在 0.10.x 版本中,提交给 kafka 代理的偏移量就像路径一样/brokers/topics/__consumer_offsets,而 zookeeper 中的路径/consumers什么都没有。所以我无法通过zookeeper获取消费者组列表。

0 投票
2 回答
7643 浏览

elasticsearch - Kafka-Connect:在分布式模式下创建新连接器就是创建新组

我目前正在使用 confluent 3.0.1 平台。我正在尝试在两个不同的工作人员上创建 2 个连接器,但尝试创建一个新的连接器正在为它创建一个新组。

但它们都是在不同的组 ID 下创建的。在此之后,我查询了现有的组。

这些组是由 Kafka connect 自动创建的,不是我提供的。我在worker.properties 中给出了不同的group.id。但我希望两个连接器都在同一个组下,以便它们并行工作以共享消息。截至目前,我有 100 万个关于“dev.ps_primary_delivery”主题的数据,我希望两个连接器各获得 50 万个数据。

请让我知道如何做到这一点。

0 投票
1 回答
269 浏览

scala - 将不同的 Kafka Connect 实例发送到不同的 Kafka 主题

我曾尝试使用一名工作人员以分布式模式将 Kafka Connnect 实例的信息发送到特定主题,我在启动实例时使用的“archive.properties”文件中有主题名称。

但是,当我发送五个或更多实例时,我看到消息合并到所有主题中。

我认为的“解决方案”是制作一张地图来存储 ID 和主题之间的关系,但它不起作用

是否有特定的 Kafka 连接实现来执行此操作?

谢谢。

0 投票
1 回答
415 浏览

apache-kafka - 无法使用 Confluent 平台将 Avro 数据推送到 HDFS

我有一个系统将 Avro 数据推送到多个 Kafka 主题中。
我想将该数据推送到 HDFS。我遇到了融合,但不确定如何在不启动的情况下将数据发送到 HDFS kafka-avro-console-producer

我执行的步骤:

  1. 我有自己的 Kafka 和 ZooKeeper 正在运行,所以我刚刚启动了融合的模式注册表。

  2. kafka-connect-hdfs在更改主题名称后开始。这一步也是成功的。它能够连接到 HDFS。

在此之后,我开始将数据推送到 Kafka,但消息没有被推送到 HDFS。

请帮忙。我是 Confluent 的新手。

0 投票
1 回答
499 浏览

amazon-web-services - Kafka Connect 与 AWS Hadoop 实例的托管

对于生产类型的设置,将 TB 的记录写入 KAFKA 主题,使用 KAFKA 连接 - HDFS 连接器的最佳实践是什么?

我的 kafka 实例在 AWS 主机名 abcd 上运行,我的 hadoop 名称节点在 AWS 主机名 pqrs 上。出于开发/POC 的目的,我们在同一个盒子中保持融合,因为我们在 abcd 上运行 kafka 实例。HDFS 集群大小为 500GB。

但是对于集群大小为 20-30 TB 的生产类型设置,是否建议在与 KAFKA 实例相同的盒子或 Namenode 盒子或单独的盒子中保持融合?在这样的生产案例中,融合需要多少单独的磁盘大小?

0 投票
0 回答
54 浏览

elasticsearch - 如何将唯一 ID 从 Kafka 移动到弹性搜索?

如何将唯一 ID 从 Kafka 移动到弹性搜索?

我已经使用了从 kafka 到 Elasticsearch 的 Elasticsearch 连接器,但它正在发送整个数据。我只需要将唯一 ID 从 kafka 发送到 ES

0 投票
1 回答
1318 浏览

java - Kafka Connect S3 sink 在加载 Avro 时抛出 IllegalArgumentException

我正在使用qubole 的 S3 接收器将 Avro 数据以 Parquet 格式加载到 S3 中。

在我的 Java 应用程序中,我创建了一个生产者

然后将 a 转换GenericRecordbyte[]格式:

我在 Kafka Connect 属性中使用以下值:

以及我的文件接收器属性中的以下配置选项:

当我运行连接器时,我收到以下错误消息:'java.lang.IllegalArgumentException:Avro 模式必须是记录'。

我对 Kafka Connect 很陌生,我知道可以设置 Schema Registry 服务器——但我不明白接收器是否需要注册表来将 Avro 数据转换为 Parquet,或者这是否是某种我的格式或配置问题。在此错误的上下文中,“记录”指的是哪种数据格式?任何方向或帮助将不胜感激。

0 投票
1 回答
371 浏览

elasticsearch - 是否有适用于 DC/OS、ElasticSearch、Kafka Connect 和 Kafka Streams 的 CloudFormation 模板?

有很多SMACK 堆栈的示例,但在我的基础架构中,我想使用 ElasticSearch 和 Confluent Kafka Connect 和 Kafka Streams。

有一个很棒的教程介绍了如何部署基于 CloudFormation 的 SMACK 堆栈环境,还有一个教程介绍了如何使用 SMACK 创建 IoT 管道

由于我正在研究Lambda 架构,因此我首先使用 ElasticSearch(不是 Cassandra)来处理我的批处理数据,并且想知道是否有使用 Kafka Connect、ElasticSearch 的 CloudFormation 模板。最终我们想将 Kafka Streams 与 InfluxDB 一起使用?