问题标签 [pykafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
360 浏览

python - pykafka queued.max.messages.kbytes 超出允许范围

我正在尝试将pykafkardkafka一起使用来使用消息,但是在尝试初始化使用者时出现以下错误。

配置属性“queued.max.messages.kbytes”值 102400000 超出允许范围 1..2097151

以下是我的代码。有人可以帮我解决这个错误吗?

0 投票
0 回答
332 浏览

python - 为什么我的 apache kafka 消费者随机忽略排队的消息?

这可能是一个 eisenbug,所以我不期待硬性答案,而是更多关于寻找什么能够复制该错误的提示。

我有一个由多个服务组成的事件驱动、基于 kafka 的系统。目前,它们被组织成线性管道。一个主题,一种事件类型。每个服务都可以被认为是从一种事件类型到一种或多种事件类型的转换。

每个转换都作为一个 python 进程执行,有自己的消费者和自己的生产者。它们都共享相同的代码和配置,因为这都是从服务实现中抽象出来的。

现在,有什么问题。在我们的暂存环境中,有时(假设每 50 条消息中有一条)Kafka 上有一条消息,但消费者根本没有处理它。即使您等待数小时,它也会挂起。这不会发生在本地环境中,我们无法在其他任何地方重现它。

一些更相关的信息:

  • 这些服务经常出于调试目的而重新启动,但挂起似乎与重新启动无关。
  • 当消息挂起并且您重新启动服务时,该服务将处理该消息。
  • 这些服务是完全无状态的,所以没有缓存或其他奇怪的事情发生(我希望)
  • 发生这种情况时,我有证据表明他们仍在处理旧消息(我在他们产生输出时记录,这发生在消费者循环结束之前)
  • 在当前的部署中,消费者组中只有一个消费者,因此在相同的服务中没有并行处理,也没有服务的水平扩展

我如何消费:

我使用 pykafka,这是消费者循环:

我的假设是我使用消息的方式存在一些问题,或者消费者组偏移量存在一些不一致的状态,但我无法真正解决这个问题。

我也在考虑搬到浮士德来解决这个问题。鉴于我的代码库和架构决定,过渡应该不会太难,但在开始这样的工作之前,我想确定我正在朝着正确的方向前进。现在这只是一个盲目的尝试,希望造成问题的一些细节会消失。

0 投票
2 回答
265 浏览

python - PyKafka API 使用

我是 Kafka 和 PyKafka 的新手。我知道生产者和消费者是通过以下代码在 PyKafka 中创建的。

我想知道 KafkaClient 是什么,以及它如何帮助创建生产者和消费者。

我读过我们也可以使用client.clusterand创建集群和代理client.broker,但我不明白client这里的用法。

0 投票
0 回答
1366 浏览

apache-kafka - 生产/消费到远程 Kafka 不起作用

我已经通过 Bitnami AMI 映像设置了一个运行 Apache Kafka 0.8 的 AWS EC2 实例。服务器属性几乎是默认的(Kafka 位于 localhost:9092,zookeeper 位于 localhost:2181)。

当我通过 SSH 连接到机器时,我可以使用位于 kafka/bin 的 Kafka 提供的脚本来生成/使用数据。为了生产,我运行以下命令:

消费:

这工作正常,因此我确定 Kafka 工作正常。接下来,我尝试使用 python 库 pykafka 从我的机器上生产/消费:

我消费如下:

这些片段运行时没有错误或异常,但不会产生或使用任何消息,甚至是通过本地脚本创建的消息。

我已经确认 9092 和 2181 端口都是通过 telnet 打开的。我的问题如下:

  • 有没有办法调试此类问题并找到根本原因?如果存在一些连接问题,我希望库会抛出异常。
  • 到底是怎么回事?
0 投票
0 回答
111 浏览

apache-spark - 如何使用 pykafka 消费者从主题中获取数据

我不知道如何从 Pykafka 消费者那里获取数据。我什至有问题打印消费者的主题。问题是,无论我对消费者调用什么方法,这个过程都会永远挂起。如果我只是初始化消费者而不使用它,则该过程完成。感谢您提前提供任何帮助。

0 投票
0 回答
297 浏览

python - 如何与 kafka/pykafka 平衡消费者合作?

我对卡夫卡完全陌生。我和简单的消费者一起工作,没关系。现在我已经和平衡的消费者一起工作了,问题是我对卡夫卡只有一个模糊的概念。

这是我最初的方法想法。然而,循环只是停留在 consumer_a 中。我不确定如何处理循环退出。我想过让循环也处理我的另一个异步调用,但我不确定它是否正确。

任何人都可以提出相同的建议或指出一些更好的外行可以理解的文档吗?还是我错过了其他一些概念?

我的要求是同时处理两个消费者。谢谢

编辑 如果有人可以解释分区的概念,它也会很有用。

0 投票
1 回答
289 浏览

python - 类型错误:生产()得到了一个意外的关键字参数“linger_ms”

我正在尝试将 kafka 与 python 与 pykafka 一起使用,当我尝试使用 linger_ms 时出现此错误:

类型错误:生产()得到了一个意外的关键字参数“linger_ms”

这是我的代码:

0 投票
1 回答
226 浏览

apache-kafka - 无法从远程机器与 Kafka 集群通信

所以我一直在尝试在非 kafka 集群上使用 pykafka 发送消息(它没有 kafka,只有必要的库)使用附加的代码片段,我将消息发送到 kafka 集群节点以供使用。但它返回超时异常。

我已经尝试了几乎所有可用的东西,即使是在 stackoverflow 上已经创建的问题。

问题:

是否需要在我的非 kafka 集群上也有 kafka 才能成功通信?(不要这么认为)

任何帮助,将不胜感激 ??

服务器配置:

0 投票
1 回答
517 浏览

apache-kafka - 无法使用 pykafka 发送消息

我正在使用 pykafka,我可以获取主题名称,但我无法发送消息。我的代码如下所示

我收到此错误消息

0 投票
1 回答
2102 浏览

python-3.x - Pytest 一个 KAFKA 消费者和生产者(集成测试)

我是 Pytest 框架的新手。我目前有一个 Python 应用程序,它使用 Kafka-Python 消费和生成消息。我正在尝试编写集成测试,以验证是否已从主题消费和生产。我目前正在使用pytest-docker一个带有生产者和消费者主题的 Kafka 容器。我遇到间歇性错误“NoBrokersAvailable,无法识别的代理版本”我不确定我做错了什么。任何帮助将不胜感激。

码头工人-compose.yml:

conftest.yml:

测试.py:

制片人: