问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4420 浏览

apache-kafka - Kafka Producer 0.9 小消息的性能问题

我们观察到 Java Kafka Producer 0.9 客户端在发送小消息时性能非常差。消息不会累积到更大的请求批次中,因此每个小记录都是单独发送的。

我们的客户端配置有什么问题?还是这是其他问题?


使用 Kafka 客户端 0.9.0.0。我们在 Kafka 未发布的 9.0.1 或 9.1 固定或未解决列表中没有看到任何相关的帖子,因此我们专注于我们的客户端配置和服务器实例。

我们理解 linger.ms 应该导致客户端将记录累积到一个批次中。

我们将 linger.ms 设置为 10(也尝试了 100 和 1000),但这些并没有导致批量累积记录。对于大约 100 字节的记录大小和 16K 的请求缓冲区大小,我们预计在单个请求中发送大约 160 条消息。

尽管已经分配了一个新的 Bluemix Messaging Hub (Kafka Server 0.9) 服务实例,但客户端的跟踪似乎表明该分区可能已满。测试客户端在没有其他 I/O 的情况下循环发送多条消息。


日志显示带有可疑行的重复序列:“ Wake up the sender since topic mytopic partition 0 is full or getting a new batch ”。

因此,在我们的测试用例中,新分配的分区本质上应该是空的,那么为什么生产者客户端会得到一个新批次呢?


我们提供了以下属性文件:


Kafka 客户端显示扩展/合并的配置列表(并显示 linger.ms 设置):


发送 100 条记录后的 Kafka 指标:

谢谢

0 投票
1 回答
1507 浏览

apache-kafka - 卡夫卡消费者无法反序列化抛出索引越界

生产者代码如下

消费者代码如下

当我尝试将消息转换为对象时,它失败了。我从其中一篇文章中获得了自定义序列化程序代码,如下所示。任何人都可以指出实施有什么问题。我尝试使用自定义序列化程序中的 FromBytes 并没有帮助。序列化程序返回空对象

自定义序列化器

PltResultPage 如下。

0 投票
1 回答
812 浏览

apache-kafka - 如何在 Kafka 中创建多个消费者以从生产者那里读取相同的消息,并在收到这些消息后在每个消费者处执行不同的任务

我有一个用例,我需要创建 3 个消费者,它们可以从生产者发送的单个或多个分区中读取所有消息。我需要我的 3 个消费者在收到相同的消息后执行 3 个不同的任务。

一种很好的方法是创建一个具有不同 group.id 的消费者组,方法是使用

props.put("group.id", UUID.randomUUID().toString())

我已经为这个想法参考了以下链接。

Kafka 多个消费者用于一个分区

我坚持知道如何使用 ConsumerGroupExample 代码来实现这一目标?如何创建多个消费者?以及收到消息后如何分别管理?我需要创建 ConsumerGroupExample 的多个对象吗?

0 投票
2 回答
992 浏览

java - 使用出站适配器配置断路器以处理连接超时问题

通过上述配置,我们正在尝试:

1.期望将失败的消息传播到没有发生的错误通道 =“failedChannel2”。因为我在控制台中看不到转换后的输出。

2.CircuitBreaker 正在为 ServiceActivator 工作(对于应用程序相关的异常,如上所述),但是我们如何为出站适配器的失败案例配置 CB。例如:当连接超时或服务器突然关闭/网络连接问题/在将消息从 SI 通道发送到外部(kafka)服务器之前出现一些环境问题。我们可以为这种情况配置带有出站适配器的 CB。

根据关于断路器建议的 SI 文档,如下所示。

“通常,此建议可能用于外部服务,可能需要一些时间才能失败(例如尝试建立网络连接的超时)”。

请就如何实现这一点提出建议。非常感谢。

更新配置:

日志 :

01-05@23:44:18,598 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id =e0591162-3b93-9bb6-0699-89b15b20e904}] 调试:-com.XXX.ProducerMessageHandler#0 收到的消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904} ] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [com.XXX.ProducerMessageHandler#0] 发生错误;嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean $1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 调试:-com.XXX.ProducerMessageHandler#0 收到消息:GenericMessage [payload=hello , headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 出现异常:org.springframework.messaging.MessageHandlingException: 消息处理程序 [com.XXX.ProducerMessageHandler#0] 发生错误;嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel。PublishSubscribeChannel - 在通道“toKafka”上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework .integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 得到异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6] 中发生错误;嵌套异常是 org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework。integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 调试 org.springframework.integration.channel.PublishSubscribeChannel - 在通道“toKafka”上预发送,消息:GenericMessage [payload=hello,headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.config. ServiceActivatorFactoryBean$1@6a0ef4b6]; 嵌套异常是 org.springframework.integration。

0 投票
0 回答
559 浏览

apache-kafka - 使用 kafka 分区重新分配工具在 kafka 后看到“分区不存在”警告/失败

我正在使用卡夫卡 0.8.1.1。我有一个 3 节点 kafka 集群,其中一些主题有大约 5 个分区。我计划将集群中的节点数量增加到 5 个,并将一些分区从现有主题移动到新的代理。

我看到的错误消息:

我进行了搜索,但找不到任何相关答案。感谢任何指导/帮助来解决这个问题。

0 投票
1 回答
416 浏览

scala - 每秒可调整消息量的 Kafka 生产者

编写具有稳定但可调节输出的 Apache Kafka 生产者的最佳方法是什么。

示例:生产者应该向代理发送恒定的 1000 条消息/秒。在运行期间,输出应可调整为 10 或 10000 条消息/秒。

一种方法是设置一个调度程序,它每秒运行一次并批量发送预定义数量的消息。

另外:由于这个生产者应该是性能测试框架的一部分,所以需要发送的消息量非常高。有人将如何处理非常高的负载?使用 Akka 会有好处吗?

目标语言是 Scala,但任何语言的示例代码都非常受欢迎。

0 投票
1 回答
1417 浏览

java - 是否可以将二进制文件(例如 .mp3 文件)作为单个消息放入 Kafka 中?如果是,如何?

我对 Apache Kafka 比较陌生。作为一个小项目的一部分,我试图将一个文本日志文件作为一条消息放在 Kafka 中。我遇到了一些编码错误。Kafka的java API(据我所知)包含大部分字符串编码的规定。

作为一种解决方法,我将日志文件逐行放入 Kafka 中,其中每一行代表一条消息,但这并不能解决我最初的问题陈述——一个文件作为一条消息。

0 投票
1 回答
1480 浏览

java - 我们如何在 Kafka pub/sub 中对消息进行版本控制?

我有一个关于使用 Apache Kafka 的问题,详情如下。

我构建了一个服务的 3 个实例,它们有自己的本地缓存,称为 S1、S2、S3。例如,当更新请求到达 S1 时,S1 将更新自己的本地缓存,然后将作业推送到 Kafka 以更新 S2、S3 的本地缓存。在这种情况下,我只想让 S2、S3 得到这份工作,但实际上,当我使用 Kafka 进行发布/订阅时,所有 S1、S2、S3 都会得到这份工作。

那么我该如何处理这个问题,任何建议都会有所帮助。

0 投票
2 回答
3474 浏览

java - 如何在 Storm 方案类中解析来自 kafka 主题的 json 数据?

我正在从 kafka 主题中获取 json 数据。我如何应用 json 解析来获取使用反序列化方法的风暴方案类中所有对象的所有字段,之后我将值返回到新的返回值()。(backtype.storm.tuple.Values 类方法) ?即,如果我的主题中有 2 个 json 对象,我循环它们以获取所有字段,最后我必须将所有值返回到 return 方法。我的返回应该包含两个 json 对象的所有字段。

我的问题:return 方法中只返回了 2 个 obj json 数据。我认为第二个对象的所有字段都覆盖了第一个对象字段。最后返回第二个对象字段。

你们中的任何人都可以给我一个返回所有对象字段(1,2 个对象字段)的想法......

提前致谢

0 投票
1 回答
995 浏览

java - 将 pojos 转换为 confluent.io 中的通用记录以通过 KafkaProducer 发送

我对 Kafka 和 avro 完全陌生,并尝试使用 confluent 包。我们有用于 JPA 的现有 POJO,我希望能够简单地生成我的 POJO 的实例,而不必手动将每个值反映到通用记录中。我似乎错过了如何在文档中完成此操作。

这些示例使用通用记录并一一设置每个值,如下所示:

有几个从类中获取模式的示例,我发现注释可以根据需要更改该模式。现在我如何获取 POJO 的实例并将其按原样发送到序列化程序,并让库完成匹配类中的模式的工作,然后将值复制到通用记录中?我对这一切都错了吗?我最终想要做的是这样的:

谢谢!