问题标签 [confluent-platform]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
11387 浏览

java - 如何让 KafkaProducer 使用模拟模式注册表进行测试?

KafkaProducer在我的测试用例中使用,我的生产者使用schemaRegistryUrl指向我本地实例的Schema Registry. 有没有办法模拟KafkaProducer与模式注册表的连接方式?也就是说,KafkaProducer/Consumer在我的测试中没有正在运行的 Schema Registry 实例。

0 投票
1 回答
538 浏览

jms - Kafka 连接器 - 用于 Kafka 主题的 JMSSourceConnector

Confluent 默认情况下是否为 Kafka 主题提供此 JMSSourceConnector。

或者我们需要为此编写自定义连接器?

我在 Confluent 页面上没有看到任何文档。

0 投票
1 回答
26196 浏览

apache-kafka - 融合平台与 apache kafka

我是 kafka 的新手,我对 Confluent 平台很好奇。看来 Confluent 平台上的用户故事并不多。Confluent 平台和 Apache Kafka 有什么区别?

0 投票
1 回答
1210 浏览

cassandra - Confluent 和 Cassandra:获取 DataException:无法将数据反序列化为 Avro,未知魔术字节

我按照http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/的教程进行操作,并且能够将数据从 avro 控制台插入到 cassandra。现在我正在尝试扩展它以使用水槽,并且我在我的机器中设置了水槽,它将选择日志文件并将其推送到 kafka,尝试将我的数据插入到 cassandra 数据库。在文本文件中,我将数据

{“id”:1,“创建”:“2016-05-06 13:53:00”,“产品”:“OP-DAX-P-20150201-95.7”,“价格”:94.2}

{“id”:2,“created”:“2016-05-06 13:54:00”,“product”:“OP-DAX-C-20150201-100”,“price”:99.5}

{“id”:3,“创建”:“2016-05-06 13:55:00”,“产品”:“FU-DATAMOUNTAINEER-20150201-100”,“价格”:10000}

{“id”:4,“创建”:“2016-05-06 13:56:00”,“产品”:“FU-KOSPI-C-20150201-100”,“价格”:150}

Flume 正在挑选这些数据并将其推送到 kafka。

在 cassandra sink 中,我面临一个错误,

run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent .ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 由:org。 apache.kafka.common.errors.SerializationException:为 id -1 反序列化 Avro 消息时出错 原因:org.apache.kafka.common.errors.SerializationException:未知的魔法字节![2016-09-28 15:47:00,951] 错误任务正在被杀死,并且在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.WorkerTask:143)[2016-09-28 15:47:00,951 ] 信息 停止 Cassandra 接收器。(com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79) [2016-09-28 15:47:00,952] 信息关闭 Cassandra 驱动程序会话和集群。(com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:165)

我正在使用的架构

水槽的配置:Flume-kafka.conf.properties

谁能帮助我,如何解决这个错误?

0 投票
0 回答
583 浏览

hadoop - kafka HDFS 连接器在多 DC 设置中连接到私有 IP 而不是主机名

我有 2 个集群:

  • 一个在房子里有汇合的 (3.0.0-1)
  • AWS中的一个,带有hadoop(hdp 2.4)

我正在尝试使用 hdfs 连接器从 confluent 写入 hadoop。

长话短说:连接器尝试连接到 hadoop 集群的私有 IP,而不是使用主机名。在内部集群上,/etc/hosts 已更新以将内部 hadoop 主机名解析为相关的公共 IP。

我正在使用分布式连接器,我有一堆连接器 JSON 文件,如下所示:

并且工人被定义为:

几点注意事项:

  • /kafka-connect 存在于 hdfs 上,世界可写
  • 3 个主题 (*.storage.topic) 确实存在
  • 我有一个工作人员在每 (3) 个带有 kafka 代理的服务器上运行(在所有代理上都有一个模式注册表、rest API 和 zookeeper 服务器)
  • 我已将 dfs.client.use.datanode.hostname 设置为 true,并且此属性在 $HADOOP_HOME/hdfs-site.xml 中的客户端上设置

我看到创建了 /kafka-connect 的子目录以及配置单元元数据。当我启动连接器时,消息是:

createBlockOutputStream (org.apache.hadoop.hdfs.DFSClient:1471) org.apache.hadoop.net.ConnectTimeoutException 中的信息异常:等待通道准备好连接时出现 60000 毫秒超时。ch : java.nio.channels.SocketChannel [在 org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533) 在 org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java: 1610) 在 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.DFSOutputStream(DFSOutputStream.java:1361) 在 org.apache.hadoop 的 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) .hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) 信息放弃 BP-429601535-10.0.0.167-1471011443948:blk_1073742319_1495 (org.apache.hadoop.hdfs.DFSClient:

关于如何解决这个问题的任何想法?看起来融合直接接收 IP,而不是主机名。

0 投票
1 回答
189 浏览

apache-kafka - 使用 2.10 包的融合构建包含 scala 2.11 jar

我下载了 confluent-2.0.0-2.10.5.tar.gz,因为我想要 scala 2.10 包

但 /share/java/schema-registry 中的 kafka jar 仍然是 kafka_2.11-0.9.0.0-cp1.jar

无论如何我可以获得一个干净的 2.10 scala confluent 包

0 投票
2 回答
556 浏览

apache-kafka - 添加额外的 Schema Registry 层来减轻权重的好处?

在生成/消费者消息时添加模式注册表的附加层(也称为故障点)有什么好处吗?如果服务出现故障,则不会使用或生成消息。使用 Kafka 的系统不会因为不使用 Schema Registry 而减少出错的可能性吗?

0 投票
4 回答
2707 浏览

apache-kafka - 如何在 Windows 上运行 kafka rest 代理

如何在 Windows 上运行 kafka rest 代理。

我下载了 confluent-2.0.1-2.11.7.ta​​r.gz

在 Windows 文件夹中,我看不到 kafka-rest-start。

0 投票
2 回答
3000 浏览

java - Spring Cloud Stream Kafka > 使用来自 Confluent REST 代理的 Avro 消息

我有以下情况:

我的应用程序如下所示:

而 MyMessage 是 Avro 从 Avro 模式创建的类。

我的 application.properties 看起来像这样:

我现在的问题是每次收到新消息时,都会抛出以下异常:

据我了解,问题在于 Confluent 堆栈包含消息模式的 ID 作为消息有效负载的一部分,并且客户端应该在模式 ID 之后开始读取实际的 Avro 消息。看来我需要配置 Kafka 绑定以使用 Confluent 的 KafkaAvroDeserializer,但我不知道如何实现这一点。

(我可以使用 Confluent 的 avro 控制台消费者完美地检索消息,因此 Avro 编码似乎不是问题)

我还尝试使用 @EnableSchemaRegistry 注释并配置 ConfluentSchemaRegistryClient bean,但在我看来,这仅控制模式的存储/检索位置,而不是实际的反序列化。

这甚至应该以某种方式工作吗?

0 投票
1 回答
189 浏览

apache-kafka - 融合 kafka rest avrò 制作人

我使用 confluent kafka-rest 生成 avro 消息,当我使用 rest 端点消费它时,返回格式是这样的。

我想知道kafka中的消息实际上是这种格式还是消费者端点添加了“密钥”和“分区”。如果是前一个会很奇怪,因为在这种情况下,当您执行生产请求时响应有效负载 avro 模式是不同的(与 ACTUAL_PAYLOAD 匹配的模式)