问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
269 浏览

apache-kafka - 为什么写入和读取 Kafka 队列之间存在延迟?

我编写了一个工作服务来使用来自 Kafka 队列的消息,并且我还编写了一个测试脚本来每隔几秒钟将消息添加到队列中。

我注意到的是,当消息被添加到队列中时,消费者通常会一次闲置几分钟。然后突然消费者会拿起第一条消息,处理它,然后迅速转移到其余部分。所以它最终赶上了,但我想知道为什么首先会有这样的延迟?

0 投票
3 回答
1906 浏览

python - Kafka Consumer 没有收到来自 Producer 的任何消息

以下是我为 kafka 生产者编写的 python 编码,我不确定消息是否能够发布到 Kafka Broker。因为消费者端没有收到任何消息。当我使用生产者控制台命令对其进行测试时,我的消费者 python 程序运行良好。

这是我的“消费者”python 编码

0 投票
2 回答
2386 浏览

python - python kafka库的编码/格式化问题

我一直在尝试使用python kafka库,但无法让生产者工作。

经过一番研究,我发现 kafka 向消费者发送了(我猜也是预期的)一个额外的 5 字节标头(一个 0 字节,一个长包含模式注册表的模式 id)。我已经设法通过简单地剥离第一个字节来让消费者工作。

在编写制片人时,我应该在前面加上一个类似的标题吗?

下面出现的异常:

我正在使用 kafka 和 python-kafka 的最新稳定版本。

编辑

消费者

制片人

0 投票
2 回答
21786 浏览

python - 如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?

我正在使用带有通配符的模式订阅 Kafka,如下所示。通配符代表动态客户 ID。

这很好用,因为我可以从主题字符串中提取客户 ID。但是现在我需要扩展功能以听一个类似的主题,目的略有不同。让我们称之为customer.*.additional-validations。代码需要存在于同一个项目中,因为共享了太多功能,但我需要能够根据队列的类型采用不同的路径。

Kafka 文档中,我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵活性的模式。

所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(非工作):

0 投票
1 回答
109 浏览

python - 单个生产者到多个消费者(相同的消费者组)

在将消息从单个生产者发送到具有不同消费者组 ID 的 2 个不同消费者之前,我已经尝试过。结果是两个消费者都能够阅读完整的消息(两个消费者都得到相同的消息)。但是我想问一下,这两个消费者是否可以在将它们设置为相同的消费者组名称时读取不同的消息?

0 投票
1 回答
3594 浏览

apache-kafka - 卡夫卡传递重复消息

我们使用 kafka(0.9.0.0) 来编排不同微服务之间的命令消息。我们发现了一个间歇性问题,其中重复消息被传递到特定主题。下面给出了发生此问题时发生的日志。有人可以帮助理解这个问题吗

0 投票
1 回答
3647 浏览

apache-kafka - kafka-python 消费者给出错误

我对 kafka 和 kafka-python 相当陌生。安装 kafka-python 后,我尝试从这里简单地实现消费者代码 - http://kafka-python.readthedocs.io/en/master/usage.html

我一直在 kafka 的 bin 目录中编写消费者代码,并尝试从那里运行 python 代码。但是我收到以下错误:

Traceback(最近一次调用最后一次):文件“KafkaConsumer.py”,第 4 行,用于消费者中的消息:文件“/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py”,第 559 行,在下一个返回类型(self)中。下一个(自我)文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”,第 915 行,在下一个 返回下一个(self._iterator)文件“/usr/local/lib /python2.7/dist-packages/kafka/consumer/group.py”,第 876 行,在 _message_generator for msg in self._fetcher:文件“/usr/local/lib/python2.7/dist-packages/kafka/vendor /six.py”,第 559 行,在下一个返回类型(self)中。下一个(自我)文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第 520 行, 返回下一个(self._iterator)文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第477行,在self._unpack_message_set(tp,messages)的_message_generator中为msg:文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第 372 行,在 _unpack_message_set inner_mset = msg.decompress() 文件“/usr/local/lib/python2.7 /dist-packages/kafka/protocol/message.py",第 121 行,在解压缩中断言 has_snappy(),'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported

这是我一直试图运行的代码:

因为,我对 Kafka 真的很陌生,所以我很难理解我做错了什么。

0 投票
0 回答
860 浏览

apache-kafka - Producer 无法通过 DNS 连接到 broker

我有一个物理服务器,在那里我将advertised.host.name 设置为服务器ip,并在路由器上进行端口转发。但是生产者无法使用 dns 连接到代理。

错误:pykafka.connection:无法连接到 192.168.1.3:9092 警告:pykafka.producer:Broker 192.168.1.3:9092 已断开连接。重试。

0 投票
1 回答
1741 浏览

python - 使用 python 的 Kafka 生产者:TypeError:所有产生的消息有效负载必须为空或类型字节

我刚开始学习 Python 和 Kafka。这是我尝试开始的第一个示例。 http://www.giantflyingsaucer.com/blog/?p=5541

我有一个例外:

我在谷歌上搜索过,但我不太确定问题是什么。谁能给我一些建议?非常感谢!

这是我的代码:

0 投票
3 回答
10072 浏览

python - Python:如何为单元测试模拟 kafka 主题?

我们有一个消息调度程序,它在将消息属性放入带有该键的 Kafka 主题队列之前从消息属性生成一个哈希键。

这样做是出于重复数据删除的目的。但是,我不确定如何在不实际设置本地集群并检查它是否按预期执行的情况下测试这种重复数据删除。

在线搜索用于模拟 Kafka 主题队列的工具并没有帮助,我担心我可能以错误的方式思考这个问题。

最终,任何用于模拟 Kafka 队列的行为都应该与本地集群的行为方式相同 - 即通过向主题队列插入密钥来提供重复数据删除。

有没有这样的工具?