问题标签 [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1684 浏览

apache-kafka - 如何在 YARN 中运行 Kafka 连接工作者?

我正在玩Kafka-Connect。我已经HDFS connector独立模式分布式模式下工作。

他们宣传可以通过管理工作人员(负责运行连接器)YARN 但是,我还没有看到任何描述如何实现此目标的文档。

我该如何去YARN处决工人?如果没有具体的方法,是否有关于如何让应用程序在其中运行的通用方法YARN

我已经YARN与 SPARK 一起使用过spark-submit,但是我无法弄清楚如何让连接器在YARN.

0 投票
1 回答
1507 浏览

apache-kafka - Kafka Connect:运行连接器和消费者后未更新 test.sink.txt

我尝试使用以下步骤将数据输入到 Kafka 连接测试主题中。我能够使用主题一次,并且在第二次尝试使用新文件时,我无法使用数据。输出文件test.sink.txt未更新

  1. echo -e "foo\nbar" > test.txt

  2. bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

  3. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning

0 投票
1 回答
876 浏览

apache-kafka - 扩展 Kafka Connect SinkTask 并从给定的偏移量开始消费

我想扩展SinkTask以创建自己的接收器连接器。

如果我在刷新期间保存偏移量,并且下次启动接收器连接器时我想从保存的偏移量中恢复读取,那么正确的方法是什么?

我尝试使用SinkTaskContext被覆盖initialize(SinkTaskContext context)的来分配我自己的偏移量:

但这不起作用,因为尚未分配分区。我遇到了一个例外。

然后我是否应该将上下文 (from initialize()) 保存到全局变量中,然后使用它在方法内部分配偏移量open(Collection<TopicPartition> partitions)(overridden from SinkTask),就像我在里面做的那样initialize?例如:

0 投票
2 回答
4680 浏览

cassandra - 如何配置从 kafka 到 cassandra 的 kafkaConnect

我想设置从 kafka 主题到 cassandra 的 kafka 连接

问题很简单:说我在kafka中有一个带有json数据的演示主题,例如

我想自动将其推送到 cassanra 表中,其中包含 id、name、clicks 列。

我正在研究kafka-connect-cassandra,但我能找到的唯一示例是从 cassandra 读取并通过中间的 kafka 写入另一个 cassandra 表。

我的问题是如何让它从 kafka 而不是 cassandra 中读取?我正在寻找一些连接器开源并带有示例。

0 投票
1 回答
786 浏览

hadoop - 使用 Kafka HDFS Connect 写入 HDFS 时出错

我正在尝试使用 kafka HDFS 连接器将 avro 格式的数据从我的 Java 代码写入 Kafka 到 HDFS,但我遇到了一些问题。当我使用 confluent 平台网站上提供的简单架构和数据时,我能够将数据写入 HDFS,但是当我尝试使用复杂的 avro 架构时,我在 HDFS 连接器日志中收到此错误:

我正在使用融合平台 3.0.0

我的Java代码:

架构(这是从 avdl 文件创建的):

json文件:

这似乎是架构的问题,但我无法确定问题所在。

0 投票
2 回答
4850 浏览

apache-kafka - 用于 Azure Blob 存储的 Kafka 连接器

我需要将推送到 Kafka 的消息存储在深度存储中。我们正在使用 Azure 云服务,所以我认为 Azure Blob 存储可能是一个更好的选择。我想使用 Kafka Connect 的接收器连接器 API 将数据推送到 Azure Blob。Kafka 文档大多建议使用 HDFS 导出数据,但在这种情况下,我需要一个运行 Hadoop 的 Linux VM,我猜这会很昂贵。我的问题是 Azure Blob 存储是存储 JSON 对象的合适选择,而构建自定义接收器连接器是这种情况下的合理解决方案吗?

0 投票
2 回答
1072 浏览

java - Kafka 连接 API 客户端

试图将数据从 kafka 写入 hdfs。任何地方都没有记录如何使用 Confluent 的 kafka-connect-hdfs Java API。

0 投票
1 回答
925 浏览

apache-kafka - Kafka Connect 接收器任务中多久触发一次 put()?

put()我可以控制触发 Kafka Connect Sink 任务方法的时间间隔吗?Kafka Connect 框架在这方面的预期行为是什么?理想情况下,我想指定,例如,“不要打电话给我,除非你有 X 个新记录/Y 个新字节,或者自上次调用以来经过了 Z 毫秒”。这可能会使接收器任务中的批处理逻辑更简单(引用文档,“在许多情况下,内部缓冲将很有用,因此可以一次发送整批记录,从而减少将事件插入下游数据存储的开销) .

0 投票
1 回答
1667 浏览

apache-kafka - Kafka 连接教程停止工作

我在此链接上执行了步骤 #7(使用 Kafka Connect 导入/导出数据):

http://kafka.apache.org/documentation.html#quickstart

在我删除“test.txt”文件之前,它运行良好。主要是因为这就是 log4j 文件的工作方式。一段时间后,该文件将被轮换-我的意思是-它将被重命名,并且将开始写入具有相同名称的新文件。

但之后,我删除了“test.txt”,连接器停止工作。我重新启动了连接器、代理、zookeeper 等,但来自“test.txt”的新行不会进入“connect-test”主题,因此不会进入“test.sink.txt”文件。

我怎样才能解决这个问题?

0 投票
1 回答
811 浏览

apache-kafka - Kafka Connect HDFS 接收器问题

我正在尝试使用带有 HDFS 接收器连接器的 Kafka-Connect 流式传输数据。Standalone 和 Distributed 模式都运行良好,但它只写入 HDFS 一次(基于刷新大小)并且以后不会流式传输。如果我遗漏了一些东西,请帮忙。

汇合 2.0.0 和卡夫卡 0.9.0