问题标签 [confluent-platform]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
18676 浏览

apache-kafka - 卡夫卡休息的例子

有没有使用 Java 中的 Kafka rest api 的生产者和消费者组的好例子。我不是在寻找生产者和消费者的 simpleconsumer 或 kafka 客户端示例。任何帮助表示赞赏。

0 投票
1 回答
1667 浏览

apache-kafka - Kafka 连接教程停止工作

我在此链接上执行了步骤 #7(使用 Kafka Connect 导入/导出数据):

http://kafka.apache.org/documentation.html#quickstart

在我删除“test.txt”文件之前,它运行良好。主要是因为这就是 log4j 文件的工作方式。一段时间后,该文件将被轮换-我的意思是-它将被重命名,并且将开始写入具有相同名称的新文件。

但之后,我删除了“test.txt”,连接器停止工作。我重新启动了连接器、代理、zookeeper 等,但来自“test.txt”的新行不会进入“connect-test”主题,因此不会进入“test.sink.txt”文件。

我怎样才能解决这个问题?

0 投票
2 回答
2563 浏览

apache-kafka - **Kafka** 跨区域数据中心之间的双向同步

我有一个部署,我们使用 kafka 从服务发送消息。但是我们需要在所有区域都拥有 Kafka 大师。因此,一旦消息被推送到 1 个数据中心,就应该在其他数据中心同步。当它再次在其他数据中心完成时,它应该被同步回来。Mirror Maker 可以提供从 1 到其他的同步,但我如何实现双向同步?

0 投票
1 回答
811 浏览

apache-kafka - Kafka Connect HDFS 接收器问题

我正在尝试使用带有 HDFS 接收器连接器的 Kafka-Connect 流式传输数据。Standalone 和 Distributed 模式都运行良好,但它只写入 HDFS 一次(基于刷新大小)并且以后不会流式传输。如果我遗漏了一些东西,请帮忙。

汇合 2.0.0 和卡夫卡 0.9.0

0 投票
6 回答
10631 浏览

apache-kafka - 在 windows 中启动 Confluent Schema Registry

我有 windows 环境和我自己的一套 kafka 和 zookeeper 正在运行。为了使用自定义对象,我开始使用 Avro。但我需要启动注册表。下载 Confluent 平台并运行:

然后我在安装页面上看到了这个:

“Confluent 目前不支持 Windows。Windows 用户可以下载和使用 zip 和 tar 存档,但必须直接运行 jar 文件,而不是使用 bin/ 目录中的包装脚本。”

我想知道如何在 Windows 环境中启动融合模式注册表?

看了脚本的内容,很难破译。

谢谢

0 投票
4 回答
5591 浏览

apache-kafka - Confluent Schema Registry 集群模式

我正在使用来自 Confluent 的 Kafka Connect 来使用 Kafka 流并以 parquet 格式写入 HDFS。我在 1 个节点中使用 Schema Registry 服务,它运行良好。现在我想将 Schema Registry 分发到集群模式来处理故障转移。关于如何实现这一点的任何链接或片段都将非常有用。

0 投票
0 回答
93 浏览

apache-kafka - 如何将融合平台作为 linux 服务启动

我下载并尝试了 debian 软件包和 confluent 平台的 tarball 发行版。我没有找到任何可以作为 linux 服务运行的脚本。如何将此发行版中包含的所有工具作为 linux 服务运行?

谢谢

0 投票
1 回答
578 浏览

hadoop - Kafka 主题与 Kafka Connect 合并到 HDFS

是否可以配置 Kafka Connect 的 HDFS 连接器以将多个单独的主题写入/组合到一个文件中?

这些主题将包含具有相同 avro 模式的消息,我希望 KafkaConnect 充当这些 Kafka 主题和 HDFS 之间的中介。在最坏的情况下,主题内容可以在写入 HDFS 后进行组合,但我觉得 HDFS 连接器应该可以实现更简洁、更快捷的方式。

0 投票
1 回答
915 浏览

hadoop - Kafka Streams 在 HDFS 上查找数据

我正在使用 Kafka Streams (v0.10.0.1) 编写一个应用程序,并希望使用查找数据来丰富我正在处理的记录。该数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录。

如何在Kafka Streams应用程序中加载它并加入实际KStream
当新文件到达那里时,从 HDFS 重新读取数据的最佳做法是什么?

还是将Kafka ConnectRDBMS 表内容切换到所有 Kafka Streams 应用程序实例都可以使用的 Kafka 主题并将其写入会更好?

更新
正如建议的Kafka Connect将是要走的路。因为查找数据每天都会在 RDBMS 中更新,所以我考虑将 Kafka Connect 作为计划的一次性作业运行,而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销......等等。对我来说,在这种情况下进行预定提取看起来更安全。

查找数据不大,可能会删除/添加/修改记录。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中已删除什么。另外 AFAIK 当压缩发生时我没有控制权。

0 投票
1 回答
3320 浏览

scala - org.apache.spark.SparkException: 找不到 Set ([test-topic,0]) 的领导者偏移量

我尝试使用 Confluent 平台并使用此代码作为示例向 REST 端点发出高级 Kafka 请求。

我使用以下 Kafka 参数:

这是我尝试运行代码时遇到的错误。错误发生在以下行:

线程“主”org.apache.spark.SparkException 中的异常:java.nio.channels.ClosedChannelException org.apache.spark.SparkException:在 org.apache 上找不到 Set([test-topic,0]) 的领导者偏移量.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)在 scala.util.Either.fold(Either.scala:98) 在 org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) 在 org.apache.spark.streaming.kafka.KafkaUtils$ .getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at kafka.EventsConsumer$.delayedEndpoint$kafka$EventsConsumer$1(EventsConsumer.scala:53)在卡夫卡。EventsConsumer$delayedInit$body.apply(EventsConsumer.scala:22) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala: 12) 在 scala.App$$anonfun$main$1.apply(App.scala:76) 在 scala.App$$anonfun$main$1.apply(App.scala:76) 在 scala.collection.immutable.List.foreach (List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at kafka.EventsConsumer$.main(EventsConsumer .scala:22) 在 kafka.EventsConsumer.main(EventsConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl。com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 的 java.lang.reflect.Method.invoke(Method.java:498) 的调用(DelegatingMethodAccessorImpl.java:43)

更新:

我尝试更改localhost为IP,但仍然遇到同样的问题。