问题标签 [confluent-platform]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
185 浏览

rest - 如何配置 confluent kafka rest 序列化器?

是否可以更改融合的 kafka-rest 序列化程序?我想将融合的 avro 序列化程序交换为我的自定义序列化程序,但我检查了生产者池内部(https://github.com/confluentinc/kafka-rest/blob/2.x/src/main/java/io/confluent /kafkarest/ProducerPool.java)。

在构建 buildAvroProducer 时,它要求使用 KafkaAvroSerializer,这是一个融合库。所以似乎不可能在不更改代码的情况下对其进行自定义(比如简单地在类路径中交换 jar 或 sth)不确定这个要求是否公平,并且想知道它是否在路线图上。

0 投票
2 回答
5860 浏览

apache-kafka - 使用 Avro 转换器运行 Kafka Connect:ConfigException:“缺少架构注册表 url”

嗨,我正在运行 Kafka Connect docker 图像

并得到

不知道在哪里添加“schema.registry.url”配置!

0 投票
1 回答
2355 浏览

hadoop - 【HDFS connector + Kafka】如何在单机模式下编写多个topic?

我正在使用Confluent'sHDFS Connector将流式数据写入HDFS. 我按照用户手册快速入门并设置了我的连接器。当我只使用一个主题时,它可以正常工作。我的属性文件看起来像这样

当我添加多个主题时,我看到它不断提交偏移量并且我没有看到它写入提交的消息。

我尝试使用 tasks.max 和 1 和 2。我不断Committing offsets记录如下

当我优雅地停止服务 (Ctrl+C) 时,我看到它正在删除tmp文件。我究竟做错了什么?正确的方法是什么?感谢您对此的任何建议。

0 投票
0 回答
228 浏览

json - Confluent kafka rest 可选字段

将 avro 有效负载发布到 kafka-rest 服务时,如果 value_schema 中的字段之一使用默认值定义,并且在记录中省略此字段。似乎 kafka-rest 仍然坚持将其记录在案。这样做的正确方法是什么?

例如。

输出

0 投票
1 回答
1793 浏览

c# - rdkafka dotnet 库缓慢的消费者性能

现在我正在关注 OnMessage 事件处理程序的简单/高级消费者示例,性能很差。(每分钟 120k 条消息与(使用我们的 C++ 库每分钟 1M 条消息)

通过等待 OnMessage 事件而不是实现繁忙的循环并使用 .Consume(TimeSpan) 来消耗消息可能是延迟?

0 投票
0 回答
1061 浏览

java - 如何在不调用注册新模式的情况下填充 CachedSchemaRegistryClient 中的缓存?

我们有一个与 Kafka 集成的 spark 流应用程序,我正在尝试对其进行优化,因为它会过度调用 Schema Registry 来下载模式。

我们的数据的 avro 模式很少改变,目前我们的应用程序只要有记录进入就会调用模式注册表,这太过分了。

我从 confluent 遇到了CachedSchemaRegistryClient,它看起来很有希望。虽然在研究了它的实现之后,我不确定如何使用它的内置缓存来减少对 Schema Registry 的 REST 调用。

上面的链接会将您带到源代码,这里我将粘贴与将模式附加到 CachedSchemaRegistryClient 的缓存有关的唯一方法。

该方法的目的是向 Schema Registry 以及本地缓存注册一个 schema,并返回其 schemaID;如果架构已在本地存在,则返回 schemaID。如果我们正在注册一个完整的新模式,这将非常有效。

但是在架构已经在架构注册表中注册的情况下(在我们的情况下由另一个应用程序注册),我们只想将架构放在 CachedSchemaRegistryClient 的本地缓存中以便于快速访问 - 我个人认为不支持到今天为止,是否有没有自定义的干净解决方法?

我们考虑过自己维护一个本地缓存,但如果 confluent 可以提供一些东西,我们希望将其保留为最后的手段。

任何建议/想法表示赞赏,在此先感谢。

0 投票
1 回答
213 浏览

apache-kafka - 具有 SSL 配置的 Confluent schema-registry 2.0.1 版本

我需要使用融合模式注册表连接到 0.9 kafka。但是对于 schema-registry 2.0.1 版本,我没有看到 SSL 配置可用。有没有办法启用 ssl for schema-registry 和 kafka-rest 与 0.9 kafka 对话?

0 投票
2 回答
1021 浏览

elasticsearch - Kafka Connect 的 Elasticsearch 连接器 - 偏移量和时间戳

我正在为 kafka 连接使用弹性搜索连接器(汇合)。我正在从主题获取消息到弹性搜索索引。如果没有使用任何键,我可以看到偏移量是 _id 的一部分。我想根据偏移量和时间戳浏览 elasticseach 中的消息。

是否可以将消息的偏移量和时间戳作为索引中的字段?

这是弹性搜索文档

{ "_index": "test-elasticsearch-sink", "_type": "kafka-connect", "_id": "test-elasticsearch-sink+0+0", "_score": 1, "_source": { "f1": "value1" } }

谢谢, 拉杰什

0 投票
1 回答
695 浏览

hadoop - 带有 kafka-connect 的多个 hive 分区

在此过程中,我一直在尝试使用 kafka-connect 通过配置单元集成将数据流式传输到 HDFS。

我的用例要求我使用“FieldPartioner”作为分区器类。

我的问题是,我无法获得多个分区。

例子:

我的示例 JSON

我想根据“mydate”和“hour”进行分区

我尝试了以下

还尝试将 partition.field.name 指定为

以及更多这样的组合

对此问题的任何帮助将不胜感激

谢谢。

0 投票
2 回答
3001 浏览

apache-kafka - Kafka Connect HDFS Sink 使用 JsonConverter 实现 JSON 格式

以 JSON 格式从 Kafka 生产/消费。使用以下属性以 JSON 格式保存到 HDFS:

制片人:

消费者 :

问题一:

获得例外:

问题 2:

启用以上两个属性不会引发任何问题,但不会通过 hdfs 写入数据。

任何建议将不胜感激。

谢谢