问题标签 [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
6905 浏览

apache-kafka - 如何在 Kafka-Connect API 中设置 max.poll.records

我正在使用 confluent-3.0.1 平台并构建 Kafka-Elasticsearch 连接器。为此,我正在扩展 SinkConnector 和 SinkTask(Kafka 连接 API)以从 Kafka 获取数据。

作为此代码的一部分,我正在扩展 SinkConnector 的 taskConfigs 方法以返回“max.poll.records”以一次仅获取 100 条记录。但它不起作用,我同时获得所有记录,我未能在规定的时间内提交偏移量。请任何人帮我配置“max.poll.records”

0 投票
1 回答
1043 浏览

apache-kafka - 如何更新正在运行的 kafka 连接器

我在 Marathon 容器中运行了 kafka conenct。如果我想更新连接器插件(jar),我必须上传新的,然后重新启动 Connect 任务。

是否可以在不重新启动/停机的情况下做到这一点?

0 投票
2 回答
1309 浏览

java - Kafka connect confluent elasticsearch sink(找不到类错误)

我对 Kafka 连接非常陌生。我想将我的消息从 Kafka 主题推送到 elasticsearch。遵循可用文档后..我从发行版 tar.zip ( https://github.com/confluentinc/kafka-connect-elasticsearch/releases )下载并编译了弹性搜索接收器

我添加了弹性搜索属性文件并将上面的 jar 包含在类路径中。当我以独立模式运行 kafka connect 时,出现此错误

./usr/bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

NoClassDefFoundError:io/searchbox/client/JestClientFactory

我检查了 pom.xml,它正确定义了 Jest 客户端依赖项。我错过了什么吗?

任何指针将不胜感激。

谢谢, 拉杰什

0 投票
1 回答
81 浏览

apache-kafka - 如何设计既是 Sink 又是 Source 的 Kafka Connect

我正在开发一个订阅主题、转换消息、将转换后的消息推回另一个主题的 Kafka-Connector。

到目前为止,我将它作为一个SinkTask类来实现,每个任务都执行 ETL,实例化发布者对象,将消息写回另一个主题。

有没有更清洁的方法来实现它?我相信这是一个常见的用例,即连接器既是接收器也是源。

0 投票
1 回答
4893 浏览

apache-kafka - Kafka Connect 有哪些可用的 REST API?

我遇到了 2 个关于 Kafka Connect 的 REST API 的链接。https://kafka.apache.org/documentation#connect中还有一些其他的。GET /connectors/{name}/status使用或时出现 404 错误POST /connectors/{name}/restart。但是这些服务没有在http://docs.confluent.io/2.0.0/connect/userguide.html#rest-interface上列出。

我们如何检查连接器是否正常运行?

先感谢您!

0 投票
1 回答
221 浏览

kafka-consumer-api - KafkaStream 没有收到来自 Topic 的任何消息

我正在玩弄KafkaStreamsKafkaConnect只是尝试使用来自主题的消息。我为此主题设置了一个“标准”批量消费者,它就像一个魅力。我首先将几条记录发送到 Kafka,然后再使用它们。现在我想使用 Kakfa 流做同样的事情,但我没有从主题中得到任何消息。这是我正在使用的消费者代码。

我的问题是,我的代码一直处于等待it.hasNext()状态,直到达到超时。我可能在这里遗漏了一些细节,但不知道为什么我没有从这个话题中得到任何东西。作为这个测试的一部分,我有一个生产者在消费者开始之前将一些记录发送到这个主题,所以它不可能是一个偏移问题。任何想法都将受到高度欢迎。

0 投票
2 回答
1021 浏览

elasticsearch - Kafka Connect 的 Elasticsearch 连接器 - 偏移量和时间戳

我正在为 kafka 连接使用弹性搜索连接器(汇合)。我正在从主题获取消息到弹性搜索索引。如果没有使用任何键,我可以看到偏移量是 _id 的一部分。我想根据偏移量和时间戳浏览 elasticseach 中的消息。

是否可以将消息的偏移量和时间戳作为索引中的字段?

这是弹性搜索文档

{ "_index": "test-elasticsearch-sink", "_type": "kafka-connect", "_id": "test-elasticsearch-sink+0+0", "_score": 1, "_source": { "f1": "value1" } }

谢谢, 拉杰什

0 投票
1 回答
695 浏览

hadoop - 带有 kafka-connect 的多个 hive 分区

在此过程中,我一直在尝试使用 kafka-connect 通过配置单元集成将数据流式传输到 HDFS。

我的用例要求我使用“FieldPartioner”作为分区器类。

我的问题是,我无法获得多个分区。

例子:

我的示例 JSON

我想根据“mydate”和“hour”进行分区

我尝试了以下

还尝试将 partition.field.name 指定为

以及更多这样的组合

对此问题的任何帮助将不胜感激

谢谢。

0 投票
1 回答
1275 浏览

apache-kafka - Kafka连接消息排序

Kafka Sink 连接器如何在从分区获取消息时确保消息排序。我有多个分区,并且在使用每个分区的散列键发布消息时确保了消息排序。现在,当多个 Sink 任务(及其工作人员)从多个 JVM 扩展并负责从同一分区获取消息并通过 HTTP 通知目标系统时,我如何保证目标系统将按顺序接收消息.

0 投票
4 回答
4550 浏览

apache-kafka - Kafka 连接或 Kafka 客户端

我需要从 Kafka 主题中获取消息并通过基于 HTTP 的 API 通知其他系统。也就是说,从主题获取消息,映射到第 3 方 API 并调用它们。我打算为此编写一个 Kafka Sink Connector。

对于这个用例,Kafka Connect 是正确的选择,还是我应该使用 Kafka Client。