问题标签 [kafka-producer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Apache Kafka Producer Program 正在抛出 kafka.common.KafkaException、java.nio.channels.ClosedChannelException 和 kafka.common。如何解决?
[2016-02-17 09:02:36,876] 错误无法发送对具有 [0,32] 中相关 ID 的主题推文的请求
如何解决此问题任何帮助将不胜感激
提前致谢
maven - Kafka Maven 依赖项
以下两个依赖项有什么区别?我真的需要第一个来制作消费者或生产者应用程序吗?
我的 Producer 只适用于第一个,但消费者需要第二个。
我曾认为“kafka-clients”工件适用于生产者和消费者。但看起来“kafka.consumer.Consumer”来自另一个依赖项。为什么有区别?
另外,为什么第一个工件命名为kafka_2.9.2?即为什么名称中有版本标识符?
apache-kafka - Kafka消费者Java代码不起作用
我刚开始使用 Kafka,我能够通过命令提示符生成和使用数据,甚至可以从远程服务器通过 Java 代码生成数据。
但我正在尝试这个简单的消费者 Java 代码,但它不起作用。
输出是: -
没有错误,程序没有终止或给出任何输出!!
动物园管理员日志
我在 Kafka 控制台中无限循环地得到这个。请解释
我以以下方式创建了主题
我可以在命令提示符下使用它
无法理解是什么问题。
networking - 如何连接到 Virtualbox 上的 kafka 集群?
这是我在本地的设置:三个虚拟机(使用 Virtualbox)、kafka 和 zookeeper 都安装在这三个虚拟机上。他们也都在互相交谈。
我正在尝试使用本地的 kafka-console-producer,这需要代理列表。我正在提供我的虚拟机的 IP,但它似乎不起作用。我也尝试过虚拟机上的 Advertisementd.host 属性,但对我没有影响。这是我的三台机器的 server.properties:
服务器 1:
服务器 2:
服务器 3:
我的 virtualbox 也有端口转发设置:
同样,对于其他两台机器,端口也只是稍微调整了一下。
我可以很好地连接到 Zookeeper,所以:
能够连接到 VM 上的 zookeeper。但是,如果我尝试连接 kafka-console-producer,当我尝试发送消息时它会失败:
导致:
不知道我在这里做错了什么?有任何想法吗?(如果有人愿意,我可以提供 ifconfig 输出)。任何帮助将不胜感激。
[编辑 1]:添加 zookeeper quorum 的输出:
apache-kafka - Kafka 是否支持请求响应消息传递
我正在研究 Kafka 9 作为一个爱好项目,并完成了一些“Hello World”类型的示例。
我不得不考虑基于请求响应消息的真实世界 Kafka 应用程序,更具体地说,如何将 Kafka 请求消息链接到其响应消息。
我正在考虑使用生成的 UUID 作为请求消息键并将此请求 UUID 用作关联的响应消息键。与 WebSphere MQ 具有消息相关 ID 的机制类型非常相似。
我的结束 2 结束过程将是。
1)。Kafka 客户端生成随机 UUID 并发送单个 Kafka 请求消息。2)。服务器将使用此请求消息提取并存储请求 UUID 值 3)。使用消息有效负载完成业务流程。4)。使用请求消息中存储的 UUID 值作为响应消息 Key 的响应消息进行响应。5)。Kafka 客户端轮询响应主题,直到超时或检索到具有原始请求 UUID 值的消息。
我担心的是 Kafka 消费者轮询会从响应主题中删除其他客户端消息,并增加偏移量,使其他客户端失败。
我是否正在尝试将 Kafka 应用到它从未设计过的用例中?
是否可以在 Kafka 中实现请求/响应消息传递?
apache-kafka - 我有一个包含 3 个分区和 3 个复制的 Kafka 主题。我的 Storm 拓扑如何接受来自这 3 个分区和复制的数据
我用 3 个分区和 3 个复制创建了一个 Kafka 主题
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
并编写了 Kafka-Producer Program 从 API 获取数据,它工作正常。我的数据成功存储在 3 个分区和 3 个复制中。
现在我必须处理分区 Kafka 日志中的数据。我有一个风暴拓扑。我的拓扑如何接受来自日志的数据作为输入。
最初我创建了一个具有 1 个分区、1 个复制的主题,并且我的风暴拓扑工作正常。但是当我将分区保持为 3 并且复制为 3 时,数据不会进入风暴拓扑。如何解决它。
我的拓扑代码
帮我解决这个问题
提前致谢
apache-kafka - Kafka block.on.buffer.full 默认值
我正在检查 kafka(0.9.0.1) 生产者配置,block.on.buffer.full
文档中的属性说:
当我们的内存缓冲区耗尽时,我们必须停止接受新记录(块)或抛出错误。默认情况下,此设置为 true,我们会阻止,但在某些情况下,阻止是不可取的,最好立即给出错误。将此设置为 false 将完成:如果发送记录并且缓冲区空间已满,生产者将抛出 BufferExhaustedException。
所以理论上它应该是真的,但是在同一个文档(http://kafka.apache.org/documentation.html)中,该表有一个名为“default”的列,指出默认值实际上是假的。
哪一个是正确的?
apache-kafka - 生产者和消费者是否需要指定分区
我们计划让生产者(Java rest api)和消费者(Java 客户端)在多台机器上运行,并且所有消费者都属于同一个消费者组。
1)在这种情况下,如果我在生产者发布消息期间未指定分区,其中 Kafka 写入随机/默认分区......
2)当消费者检索消息时,如果我不提及分区,我会传递主题名称..并将偏移量提交给Kafka服务器..
如果我有一个消费者群体,这是一个好方法吗???
3)我是否需要拥有 Kafka 休息代理,据我了解,这不是必需的,因为我正在使用 Java 客户端..如果我错了,请纠正我...请让我知道如果我有什么好处我会得到在这里使用 Kafka 休息代理....
apache-kafka - KAFKA:找到已发布消息的分区?
我使用 KeyedMessage data = new KeyedMessage("topic",partition_key, msg); 发布了一条消息
如何重新检查消息是否到达了哪个分区?
java - 如何在 Kafka 中设置消息的大小?
我目前正在使用 Kafka 0.9.0.1。根据我发现的一些来源,设置消息大小的方法是修改server.properties
.
- message.max.bytes
- 副本.fetch.max.bytes
- fetch.message.max.bytes
我的server.properties
文件实际上有这些设置。
其他可能相关的设置如下。
但是,当我尝试发送有效负载为 4 到 6 MB 的消息时,消费者永远不会收到任何消息。生产者似乎在没有抛出任何异常的情况下发送消息。如果我确实发送了较小的有效负载(例如 < 1 MB),那么消费者确实会收到消息。
知道我在配置设置方面做错了什么吗?
这是发送消息的示例代码。
这是接收消息的示例代码。
以下是为生产者和消费者填充属性文件的方法。