问题标签 [pykafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
32 浏览

python - 现在反馈连接是否有效

我正在尝试使用 pykafka 创建一个简单的客户端。为此,我需要 SSL 证书。客户端在 RHEL 7 和 Python 3.6.x 下运行。看起来连接正常,但我没有得到任何反馈或数据,只有黑屏。如何检查连接或获取错误消息。

0 投票
0 回答
52 浏览

python-3.6 - pykafka 在nosetests 集成测试上的死锁

我们在 python3.6 上遇到了 pykafka 2.7.0 的问题。有一些集成测试,出于某种原因,在我们生成的主题上形成了死锁。在所有测试完成后,nosetests 拒绝完成。当我们在 python2.7 上运行相同的测试时不会发生这种情况,并且我们在 python3.6 上使用了 pykafka 2.8.0 没有任何帮助。

修复它的唯一方法是在向主题生成消息后立即停止/删除生产者的临时修复(如您在最后 2 行代码中所见),这会花费大量时间来停止生产者。

如果最后两行不存在,我在 gdb 中看到该进程在垃圾收集后立即卡住,并试图停止生产者。然后它挂在_wait_for_tstate_lock。它阻止了我们在 Jenkins 中的 CI 完成,并且想知道为什么它在垃圾收集上死锁,但在代码中调用时却没有死锁。

0 投票
0 回答
150 浏览

python-3.x - 如何动态创建kafka生产者

首先我在python和kafka中做婴儿步骤,所以假设我有一个listA = [item1,item2,item3]并且listA的每个项目都是producer一个主题。现在我想要的是动态地将项目添加/删除到 listA 并立即成为生产者,而且每个项目都应该在它自己的线程上运行,因为它们应该是独立的。

所以基本上我正在尝试扩展应用程序。

到目前为止,我尝试对每个生产者项目进行硬编码并在其自己的终端中运行它

每一个项目

0 投票
2 回答
2921 浏览

python - 将 csv 文件写入 kafka 主题

我有一个很大的 csv,我想写一个 kafka 主题。

此代码产生错误:

该文件如下所示:

谁能帮我这个?

0 投票
0 回答
69 浏览

python - 高延迟 PyKafka

我们正在制作一个实时产品,它接收图像并发回相关信息。出于可扩展性的目的,我们决定使用 Kafka 来平衡 Kubenertes 节点之间的工作负载。

前端 -> MainWorker -> Kafka (1) -> Worker -> Kafka (2) -> MainWorker -> 前端

由于某些原因,Kafka(1)的生产者和消费者之间有一个意外的 80-100 毫秒,对于 kafka(2)也是如此

云部署与本地部署的延迟相同

生产者 1

生产者/消费者 2

我们尝试了一些方法来减少延迟,但这看起来不会影响延迟。生产者/消费者都需要具有最低的延迟。我们不关心吞吐量。

0 投票
1 回答
254 浏览

python - Kafka 将 txt 文件读取为单个字母而不是单词

我在 python 的 pykafka 中使用 KafkaClient。我正在尝试读取一个文本文件并将其行生成一个主题,然后由消费者读取。然而,在运行时,它只读取消息中的单个字母,而不是文本文件的单词或行。我究竟做错了什么?

我的制片人是

我的消费者是

将不胜感激一些指针。我想知道这是否是读取文本文件并通过 kafka 运行的最佳方式。

0 投票
0 回答
33 浏览

python - Pykafka 客户端忽略 ssh/tls

有没有办法使用带有 usafe ssl 连接/没有正确证书的Pykafka 客户端?

要配置任何与 ssl 相关的东西,我只找到了内置的PyKafka SslConfig,但它似乎没有公开任何内容来禁用 ssl 验证。

0 投票
1 回答
126 浏览

python-3.x - 尝试通过 pykafka 连接到 kafka 时,在所有 docker 映像中出现 import _rd_kafka 错误

嗨,我尝试了多个 docker 映像,例如 Ubuntu 和 python:3.8-alpine 等。在尝试通过 pykafka 库连接到我的 kafka 集群(2.7)时,我到处都遇到如下错误。

环境信息:

注意:仅当我在外部运行时,即直接从我的机器运行代码在容器内运行时,才会发生此错误,然后它工作正常

0 投票
1 回答
375 浏览

pykafka - confluent-kafka-python 库:每个消费者组每个主题的读取偏移量

由于pykafka EOL,我们正在迁移到confluent-kafka-python。因为pykafka我们编写了一个详细的脚本,它以以下格式生成输出:

话题 消费群体 抵消
topic_alpha total_messages 100
topic_alpha 消费者_a 10
topic_alpha 消费者_b 25

我想知道是否有一个 Python 代码知道如何为confluent-kafka-python?

小字:有一个关于如何读取每个给定 consumer_group 的偏移量的部分示例。但是,我很难在consumer_group不手动解析的情况下获取每个主题的列表__consumer_offsets

0 投票
0 回答
20 浏览

python - pykafka 或 kafka-python 是否支持事务行为?

我想在 python中有一个事务性 kafka 生产者。pykafkakafka-pythonAPI是否支持这一点?