问题标签 [pykafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 现在反馈连接是否有效
我正在尝试使用 pykafka 创建一个简单的客户端。为此,我需要 SSL 证书。客户端在 RHEL 7 和 Python 3.6.x 下运行。看起来连接正常,但我没有得到任何反馈或数据,只有黑屏。如何检查连接或获取错误消息。
python-3.6 - pykafka 在nosetests 集成测试上的死锁
我们在 python3.6 上遇到了 pykafka 2.7.0 的问题。有一些集成测试,出于某种原因,在我们生成的主题上形成了死锁。在所有测试完成后,nosetests 拒绝完成。当我们在 python2.7 上运行相同的测试时不会发生这种情况,并且我们在 python3.6 上使用了 pykafka 2.8.0 没有任何帮助。
修复它的唯一方法是在向主题生成消息后立即停止/删除生产者的临时修复(如您在最后 2 行代码中所见),这会花费大量时间来停止生产者。
如果最后两行不存在,我在 gdb 中看到该进程在垃圾收集后立即卡住,并试图停止生产者。然后它挂在_wait_for_tstate_lock。它阻止了我们在 Jenkins 中的 CI 完成,并且想知道为什么它在垃圾收集上死锁,但在代码中调用时却没有死锁。
python-3.x - 如何动态创建kafka生产者
首先我在python和kafka中做婴儿步骤,所以假设我有一个listA = [item1,item2,item3]并且listA的每个项目都是producer
一个主题。现在我想要的是动态地将项目添加/删除到 listA 并立即成为生产者,而且每个项目都应该在它自己的线程上运行,因为它们应该是独立的。
所以基本上我正在尝试扩展应用程序。
到目前为止,我尝试对每个生产者项目进行硬编码并在其自己的终端中运行它
每一个项目
python - 将 csv 文件写入 kafka 主题
我有一个很大的 csv,我想写一个 kafka 主题。
此代码产生错误:
该文件如下所示:
谁能帮我这个?
python - 高延迟 PyKafka
我们正在制作一个实时产品,它接收图像并发回相关信息。出于可扩展性的目的,我们决定使用 Kafka 来平衡 Kubenertes 节点之间的工作负载。
前端 -> MainWorker -> Kafka (1) -> Worker -> Kafka (2) -> MainWorker -> 前端
由于某些原因,Kafka(1)的生产者和消费者之间有一个意外的 80-100 毫秒,对于 kafka(2)也是如此
云部署与本地部署的延迟相同
生产者 1
生产者/消费者 2
我们尝试了一些方法来减少延迟,但这看起来不会影响延迟。生产者/消费者都需要具有最低的延迟。我们不关心吞吐量。
python - Kafka 将 txt 文件读取为单个字母而不是单词
我在 python 的 pykafka 中使用 KafkaClient。我正在尝试读取一个文本文件并将其行生成一个主题,然后由消费者读取。然而,在运行时,它只读取消息中的单个字母,而不是文本文件的单词或行。我究竟做错了什么?
我的制片人是
我的消费者是
将不胜感激一些指针。我想知道这是否是读取文本文件并通过 kafka 运行的最佳方式。
python - Pykafka 客户端忽略 ssh/tls
有没有办法使用带有 usafe ssl 连接/没有正确证书的Pykafka 客户端?
要配置任何与 ssl 相关的东西,我只找到了内置的PyKafka SslConfig,但它似乎没有公开任何内容来禁用 ssl 验证。
python-3.x - 尝试通过 pykafka 连接到 kafka 时,在所有 docker 映像中出现 import _rd_kafka 错误
嗨,我尝试了多个 docker 映像,例如 Ubuntu 和 python:3.8-alpine 等。在尝试通过 pykafka 库连接到我的 kafka 集群(2.7)时,我到处都遇到如下错误。
环境信息:
注意:仅当我在外部运行时,即直接从我的机器运行代码在容器内运行时,才会发生此错误,然后它工作正常
pykafka - confluent-kafka-python 库:每个消费者组每个主题的读取偏移量
由于pykafka EOL,我们正在迁移到confluent-kafka-python。因为pykafka
我们编写了一个详细的脚本,它以以下格式生成输出:
话题 | 消费群体 | 抵消 |
---|---|---|
topic_alpha | total_messages | 100 |
topic_alpha | 消费者_a | 10 |
topic_alpha | 消费者_b | 25 |
我想知道是否有一个 Python 代码知道如何为confluent-kafka-python
?
小字:有一个关于如何读取每个给定 consumer_group 的偏移量的部分示例。但是,我很难在consumer_group
不手动解析的情况下获取每个主题的列表__consumer_offsets
。
python - pykafka 或 kafka-python 是否支持事务行为?
我想在 python中有一个事务性 kafka 生产者。pykafka
或kafka-python
API是否支持这一点?