问题标签 [confluent-platform]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
rest - 如何配置 confluent kafka rest 序列化器?
是否可以更改融合的 kafka-rest 序列化程序?我想将融合的 avro 序列化程序交换为我的自定义序列化程序,但我检查了生产者池内部(https://github.com/confluentinc/kafka-rest/blob/2.x/src/main/java/io/confluent /kafkarest/ProducerPool.java)。
在构建 buildAvroProducer 时,它要求使用 KafkaAvroSerializer,这是一个融合库。所以似乎不可能在不更改代码的情况下对其进行自定义(比如简单地在类路径中交换 jar 或 sth)不确定这个要求是否公平,并且想知道它是否在路线图上。
apache-kafka - 使用 Avro 转换器运行 Kafka Connect:ConfigException:“缺少架构注册表 url”
嗨,我正在运行 Kafka Connect docker 图像
并得到
不知道在哪里添加“schema.registry.url”配置!
hadoop - 【HDFS connector + Kafka】如何在单机模式下编写多个topic?
我正在使用Confluent
'sHDFS Connector
将流式数据写入HDFS
. 我按照用户手册快速入门并设置了我的连接器。当我只使用一个主题时,它可以正常工作。我的属性文件看起来像这样
当我添加多个主题时,我看到它不断提交偏移量并且我没有看到它写入提交的消息。
我尝试使用 tasks.max 和 1 和 2。我不断Committing offsets
记录如下
当我优雅地停止服务 (Ctrl+C) 时,我看到它正在删除tmp
文件。我究竟做错了什么?正确的方法是什么?感谢您对此的任何建议。
json - Confluent kafka rest 可选字段
将 avro 有效负载发布到 kafka-rest 服务时,如果 value_schema 中的字段之一使用默认值定义,并且在记录中省略此字段。似乎 kafka-rest 仍然坚持将其记录在案。这样做的正确方法是什么?
例如。
输出
c# - rdkafka dotnet 库缓慢的消费者性能
现在我正在关注 OnMessage 事件处理程序的简单/高级消费者示例,性能很差。(每分钟 120k 条消息与(使用我们的 C++ 库每分钟 1M 条消息)
通过等待 OnMessage 事件而不是实现繁忙的循环并使用 .Consume(TimeSpan) 来消耗消息可能是延迟?
java - 如何在不调用注册新模式的情况下填充 CachedSchemaRegistryClient 中的缓存?
我们有一个与 Kafka 集成的 spark 流应用程序,我正在尝试对其进行优化,因为它会过度调用 Schema Registry 来下载模式。
我们的数据的 avro 模式很少改变,目前我们的应用程序只要有记录进入就会调用模式注册表,这太过分了。
我从 confluent 遇到了CachedSchemaRegistryClient,它看起来很有希望。虽然在研究了它的实现之后,我不确定如何使用它的内置缓存来减少对 Schema Registry 的 REST 调用。
上面的链接会将您带到源代码,这里我将粘贴与将模式附加到 CachedSchemaRegistryClient 的缓存有关的唯一方法。
该方法的目的是向 Schema Registry 以及本地缓存注册一个 schema,并返回其 schemaID;如果架构已在本地存在,则返回 schemaID。如果我们正在注册一个完整的新模式,这将非常有效。
但是在架构已经在架构注册表中注册的情况下(在我们的情况下由另一个应用程序注册),我们只想将架构放在 CachedSchemaRegistryClient 的本地缓存中以便于快速访问 - 我个人认为不支持到今天为止,是否有没有自定义的干净解决方法?
我们考虑过自己维护一个本地缓存,但如果 confluent 可以提供一些东西,我们希望将其保留为最后的手段。
任何建议/想法表示赞赏,在此先感谢。
apache-kafka - 具有 SSL 配置的 Confluent schema-registry 2.0.1 版本
我需要使用融合模式注册表连接到 0.9 kafka。但是对于 schema-registry 2.0.1 版本,我没有看到 SSL 配置可用。有没有办法启用 ssl for schema-registry 和 kafka-rest 与 0.9 kafka 对话?
elasticsearch - Kafka Connect 的 Elasticsearch 连接器 - 偏移量和时间戳
我正在为 kafka 连接使用弹性搜索连接器(汇合)。我正在从主题获取消息到弹性搜索索引。如果没有使用任何键,我可以看到偏移量是 _id 的一部分。我想根据偏移量和时间戳浏览 elasticseach 中的消息。
是否可以将消息的偏移量和时间戳作为索引中的字段?
这是弹性搜索文档
{
"_index": "test-elasticsearch-sink",
"_type": "kafka-connect",
"_id": "test-elasticsearch-sink+0+0",
"_score": 1,
"_source": {
"f1": "value1"
}
}
谢谢, 拉杰什
hadoop - 带有 kafka-connect 的多个 hive 分区
在此过程中,我一直在尝试使用 kafka-connect 通过配置单元集成将数据流式传输到 HDFS。
我的用例要求我使用“FieldPartioner”作为分区器类。
我的问题是,我无法获得多个分区。
例子:
我的示例 JSON
我想根据“mydate”和“hour”进行分区
我尝试了以下
还尝试将 partition.field.name 指定为
和
以及更多这样的组合
对此问题的任何帮助将不胜感激
谢谢。
apache-kafka - Kafka Connect HDFS Sink 使用 JsonConverter 实现 JSON 格式
以 JSON 格式从 Kafka 生产/消费。使用以下属性以 JSON 格式保存到 HDFS:
制片人:
消费者 :
问题一:
获得例外:
问题 2:
启用以上两个属性不会引发任何问题,但不会通过 hdfs 写入数据。
任何建议将不胜感激。
谢谢