问题标签 [pykafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
424 浏览

python - 生成一批消息后,我可以从 Kafka 产品中调用一次 get_delivery_report 吗?

我正在使用 pykafka,并且有一个异步生产并使用 delivery_reports 的生产者。我知道必须使用“get_delivery_report”方法阅读交付报告,并且我知道必须在与生成的消息相同的线程中调用它。但是,get_delievery_report 是否必须在每次调用后调用才能生成,还是可以调用一次?如果发生不止一个,get_delivery_report 将返回所有失败的发送。例如,假设我异步发送 100 条消息:

还是必须是:

第一个似乎比第二个运行得快得多。

0 投票
2 回答
339 浏览

python - PyKafka 元数据以字节而不是字符串为单位

我看到 PyKafka 的异常行为,这是我最近才开始使用的客户端。

错误如下:

错误的根源在于以下几行:

调试我看到客户端使用正确的字符串 IP 连接到种子代理但是当检索到代理列表时,他们的 IP 是二进制的,当 PyKafka 尝试再次连接以创建消费者时,这些 IP 显然不起作用.

另一个可能相关的问题是我需要自己将主题名称和消费者组名称转换为字节(与其他客户端一样),但文档中的所有示例都显示了字符串的用法。

Kafka 代理版本:0.10.1.0 PyKafka 版本:2.7.0

0 投票
0 回答
171 浏览

python - 使用 pykafka 读取特定记录

我想在 Kafka 中存储大文件,使用有关记录的元数据在将来检索它们。

因此,我发送包含主题、partition_id、偏移量的消息,然后尝试以这种方式检索文件:

但它不起作用,只是打印:

这个错误非常神秘,它发生在 reset_offsets 上。当我尝试消费时,该进程然后卡住等待 rebalancing_lock。我究竟做错了什么?

0 投票
1 回答
252 浏览

python - 如何使用 kafka 主题并通过 http 解析/服务?

我正在尝试在 python 中使用 kafka 主题并使用 prometheus 客户端通过 http 提供服务,但我似乎在主题消费上被阻止了。我放置了一些占位符来简单地添加指标,但看起来那部分被阻止了。

如果我运行代码,我会看到主题数据按预期流式传输到控制台。但是,我的度量端点永远不会被填充,并且对 Web 服务器的任何请求都会挂起,直到我终止应用程序,它会使用库中的标准度量来响应该应用程序。

0 投票
1 回答
546 浏览

apache-kafka - PyKafka 客户端连接错误的代理

我无法弄清楚发生了什么。

我通过运行以下命令启动了生产者:

和消费者:

但是当我这样做时

它总是说: Failed to connect newly created broker for b'adnans-mbp':9092

在发送消息时,我在程序中一无所获。尽管在控制台上它正在接收消息。

我在做什么错?

0 投票
0 回答
326 浏览

python - PyKafka消费者与终端消费者发生冲突

我想通过分布式微服务架构实现 Kafka 进行消息传递。

我正在使用PyKafka并实现了虚拟生产者和(平衡)消费者。我将所有消费者分配到同一个消费者组我同时使用来自Python 和控制台的生产者没有问题,甚至在运行时添加它们。

但是,我对消费者有疑问。我可以创建多个 Python 消费者,甚至可以在运行时添加它们。但是,当我将控制台使用者(kafka-console-consumer)添加到具有 Python 使用者的组时,我收到互斥错误:

从消费者 ID 'b'Michals-MacBook-Pro.local:1722eea0-07d3-4be4-9d97-8b7fb15b0b30'' 提交主题 'b'michal_sample_topic'' 的偏移时出错(错误:{'pykafka.exceptions.UnknownMemberId': [0 , 1]})

此外,这两者(即使它们属于同一个消费者组)都在消费消息(Python消费者在他们自己之间平衡它并在他们之间控制台消费者)

现在,我是 Kafka 的新手,但我的第一印象是 Kafka 应该与消费者的实现无关,因此应该可以将它们结合起来。我的理解是 PyKafka 还是我对 PyKafka 的实施有问题?

制片人:

消费者:

0 投票
4 回答
10666 浏览

apache-kafka - 如何在不消费的情况下读取来自 kafka 消费者组的消息?

我正在使用跨多台机器的通用消费者组管理一个 kafka 队列。现在我还需要显示队列的当前内容。我如何仅读取组中尚未读取的那些消息,同时使这些消息再次被实际处理这些消息的组中的其他消费者读取。任何帮助,将不胜感激。

0 投票
0 回答
693 浏览

apache-kafka - Kafka 长协调器加载时间和小 ISR

我正在使用 Kafka 0.8.2.1,运行一个具有 200 个分区和 RF=3 的主题,日志保留设置为大约 1GB。

未知事件导致集群进入“协调器负载”或“组负载”状态。一些信号表明了这一点:基于 pykafka 的消费者在 s 期间开始失败,对于某些分区子集,FetchOffsetRequest错误代码为 14 。COORDINATOR_LOAD_IN_PROGRESS这些错误是在使用自协调器加载之前就存在的消费者组消费时触发的。在代理日志中,出现了这样的消息:

出于某种原因,Kafka 决定副本 11 是“首选”副本,尽管它不在 ISR 中。据我所知,当 11 重新同步时,消费可以从副本 12 或 13 不间断地继续 - 目前尚不清楚为什么 Kafka 选择非同步副本作为首选领导者。

上述行为持续了大约 6 个小时,在此期间 pykafka fetch_offsets 错误使消息无法消费。虽然协调器负载仍在进行中,但其他消费者组能够毫无错误地使用该主题。事实上,最终的解决方法是使用新的 consumer_group 名称重新启动损坏的消费者。

问题

  1. 协调器负载状态持续 6 小时是正常的还是预期的?此加载时间是否受日志保留设置、消息生成率或其他参数的影响?
  2. 非 pykafka 客户端是否COORDINATOR_LOAD_IN_PROGRESS仅通过使用非错误分区来处理?Pykafka 坚持所有分区都返回成功OffsetFetchResponse的 s 可能是消耗停机时间的来源。
  3. 为什么 Kafka 在协调器加载期间有时会选择非同步副本作为首选副本?如何将分区领导者重新分配给 ISR 中的副本?
  4. 所有这些问题都没有意义,因为我应该只使用更新版本的 Kafka 吗?

代理配置选项:

0 投票
1 回答
355 浏览

python - Pykafka - 异步发送消息和接收确认

PyKafka 有以下限制:

传递报告队列是线程本地的:它只会为从当前线程产生的消息提供报告

我正在尝试编写一个脚本,我可以在其中使用一个函数异步发送消息,并通过另一个函数继续接收确认。

以下是功能:

线程和多处理没有帮助。以上两个函数需要异步/并行运行。这里应该使用什么方法?

0 投票
1 回答
835 浏览

python - 多个主题及其优先级

我正在使用 pykafka 来消费消息,现在我正在使用 balance_consumer 来消费来自一个主题的消息。现在我必须消费来自另一个主题的消息,如果可以优先消费来自不同主题的消息。我该如何处理这个问题?可能是python的其他库?