问题标签 [apache-kafka-connect]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 如何在 YARN 中运行 Kafka 连接工作者?
我正在玩Kafka-Connect。我已经HDFS connector
在独立模式和分布式模式下工作。
他们宣传可以通过管理工作人员(负责运行连接器)YARN
但是,我还没有看到任何描述如何实现此目标的文档。
我该如何去YARN
处决工人?如果没有具体的方法,是否有关于如何让应用程序在其中运行的通用方法YARN
?
我已经YARN
与 SPARK 一起使用过spark-submit
,但是我无法弄清楚如何让连接器在YARN
.
apache-kafka - Kafka Connect:运行连接器和消费者后未更新 test.sink.txt
我尝试使用以下步骤将数据输入到 Kafka 连接测试主题中。我能够使用主题一次,并且在第二次尝试使用新文件时,我无法使用数据。输出文件test.sink.txt
未更新
echo -e "foo\nbar" > test.txt
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
apache-kafka - 扩展 Kafka Connect SinkTask 并从给定的偏移量开始消费
我想扩展SinkTask
以创建自己的接收器连接器。
如果我在刷新期间保存偏移量,并且下次启动接收器连接器时我想从保存的偏移量中恢复读取,那么正确的方法是什么?
我尝试使用SinkTaskContext
被覆盖initialize(SinkTaskContext context)
的来分配我自己的偏移量:
但这不起作用,因为尚未分配分区。我遇到了一个例外。
然后我是否应该将上下文 (from initialize()
) 保存到全局变量中,然后使用它在方法内部分配偏移量open(Collection<TopicPartition> partitions)
(overridden from SinkTask
),就像我在里面做的那样initialize
?例如:
cassandra - 如何配置从 kafka 到 cassandra 的 kafkaConnect
我想设置从 kafka 主题到 cassandra 的 kafka 连接
问题很简单:说我在kafka中有一个带有json数据的演示主题,例如
我想自动将其推送到 cassanra 表中,其中包含 id、name、clicks 列。
我正在研究kafka-connect-cassandra,但我能找到的唯一示例是从 cassandra 读取并通过中间的 kafka 写入另一个 cassandra 表。
我的问题是如何让它从 kafka 而不是 cassandra 中读取?我正在寻找一些连接器开源并带有示例。
hadoop - 使用 Kafka HDFS Connect 写入 HDFS 时出错
我正在尝试使用 kafka HDFS 连接器将 avro 格式的数据从我的 Java 代码写入 Kafka 到 HDFS,但我遇到了一些问题。当我使用 confluent 平台网站上提供的简单架构和数据时,我能够将数据写入 HDFS,但是当我尝试使用复杂的 avro 架构时,我在 HDFS 连接器日志中收到此错误:
我正在使用融合平台 3.0.0
我的Java代码:
架构(这是从 avdl 文件创建的):
json文件:
这似乎是架构的问题,但我无法确定问题所在。
apache-kafka - 用于 Azure Blob 存储的 Kafka 连接器
我需要将推送到 Kafka 的消息存储在深度存储中。我们正在使用 Azure 云服务,所以我认为 Azure Blob 存储可能是一个更好的选择。我想使用 Kafka Connect 的接收器连接器 API 将数据推送到 Azure Blob。Kafka 文档大多建议使用 HDFS 导出数据,但在这种情况下,我需要一个运行 Hadoop 的 Linux VM,我猜这会很昂贵。我的问题是 Azure Blob 存储是存储 JSON 对象的合适选择,而构建自定义接收器连接器是这种情况下的合理解决方案吗?
java - Kafka 连接 API 客户端
试图将数据从 kafka 写入 hdfs。任何地方都没有记录如何使用 Confluent 的 kafka-connect-hdfs Java API。
apache-kafka - Kafka Connect 接收器任务中多久触发一次 put()?
put()
我可以控制触发 Kafka Connect Sink 任务方法的时间间隔吗?Kafka Connect 框架在这方面的预期行为是什么?理想情况下,我想指定,例如,“不要打电话给我,除非你有 X 个新记录/Y 个新字节,或者自上次调用以来经过了 Z 毫秒”。这可能会使接收器任务中的批处理逻辑更简单(引用文档,“在许多情况下,内部缓冲将很有用,因此可以一次发送整批记录,从而减少将事件插入下游数据存储的开销) .
apache-kafka - Kafka 连接教程停止工作
我在此链接上执行了步骤 #7(使用 Kafka Connect 导入/导出数据):
http://kafka.apache.org/documentation.html#quickstart
在我删除“test.txt”文件之前,它运行良好。主要是因为这就是 log4j 文件的工作方式。一段时间后,该文件将被轮换-我的意思是-它将被重命名,并且将开始写入具有相同名称的新文件。
但之后,我删除了“test.txt”,连接器停止工作。我重新启动了连接器、代理、zookeeper 等,但来自“test.txt”的新行不会进入“connect-test”主题,因此不会进入“test.sink.txt”文件。
我怎样才能解决这个问题?
apache-kafka - Kafka Connect HDFS 接收器问题
我正在尝试使用带有 HDFS 接收器连接器的 Kafka-Connect 流式传输数据。Standalone 和 Distributed 模式都运行良好,但它只写入 HDFS 一次(基于刷新大小)并且以后不会流式传输。如果我遗漏了一些东西,请帮忙。
汇合 2.0.0 和卡夫卡 0.9.0