问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
122 浏览

apache-kafka - KafkaConsumer 回退到最旧的可用消息时出现问题

我通过以下代码将 G1 组中的一个 Kafka 消费者的获取偏移量倒回到 off1:

在上面的行中,off1 是对应分区中主题的最旧可用消息的偏移量。

现在我在不同的 G2 组中实例化一个 Kafka 消费者,如下所示:

在这里,我将读取偏移量读取为 off1,这与我为 G1 组中的 consumer1 重置的偏移量相同。我认为这不应该发生,因为不同的组偏移量应该不同。如果有人澄清,我将非常感激。提前致谢。

0 投票
1 回答
6505 浏览

python - 更新 Kafka 主题中的消息

我正在使用 Python Kafka 主题。

是否有任何供应生产者可以更新 Kafka 队列中的消息并将其再次附加到队列顶部?

根据 Kafka 的规范,这似乎不可行。

0 投票
2 回答
1908 浏览

docker - 创建主题,但在 Kubernetes 上使用 Python 获取 Kafka FailedPayloadsError

我在 python kafka-library 中使用 SimpleProducer。该脚本以前与我尝试过的其他更硬配置的 kafka-setup 完美配合。

运行此脚本一次后,我在 python 控制台中收到此响应。

然后我可以在我的 zookeeper.log 上进入我的节点并查看:

这似乎只是 Zookeeper 为该主题创建了一个新的 Znode,因为它以前不存在。Kafka server.log 打印:

但是,我的消息从未发布到该主题,并且下次我运行 python 脚本时,我总是得到:

在我让它工作的情况下,advertised.host.name 始终是节点的外部 IP,但我似乎无法通过 Kubernetes 让它工作。是否可以从容器中调用外部 IP?

我的 kafka/config/server.properties 对于所有经纪人来说都是这样的:

0 投票
2 回答
2177 浏览

message-queue - Kafka 主题或分区级别的并行性

为了分离我的数据,基于一个键:我应该在同一个主题中使用多个主题还是多个分区?我是根据服务器上引起的开销、计算、数据存储和负载来询问的。

0 投票
2 回答
6767 浏览

python - Python如何删除Kafka主题下的所有消息

我是卡夫卡的新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,也就是说前一天的数据已经过时了。如何在 python 中删除 Kafka 主题下的所有消息?或者我怎么能在 python 中删除 Kafka 主题?或者我看到有人建议等待数据过期,如果可以的话,我该如何设置数据过期时间?任何建议将不胜感激!

谢谢

0 投票
1 回答
187 浏览

python - 将 PubMed 数据推送到 Kafka

在 PubMed 数据源中,我需要将输出推送到 Kafka 队列中。每个源都可以视为一个 Kafka 主题。(我知道 Kafka 中的概念,并使用 Python 探索了 Kafka)

我可以通过 FireFTP 查看 PubMed 数据。

任何人都可以帮助如何继续前进吗?

0 投票
1 回答
1455 浏览

java - 使用 Kafka 低级 API,我应该在完成获取数据后提交偏移量吗?

我在 Kafka 源代码中找到了这种方法。我应该使用它吗?

0 投票
1 回答
233 浏览

python - 如何通过函数循环进入python中的方法调用

我正在尝试为 python kafka 创建一个简单的函数,但在将字符串循环传递给 producer.send_messages 方法时遇到了麻烦。

我会以这个错误结束,

我很欣赏任何建议,以使其成为一种更动态的方式(无需硬编码 b'messages'...等)以将消息也放入 kafka。:D

0 投票
1 回答
1261 浏览

python - 无法摆脱 Python 中 kafka 消费者守护进程的无限循环

我写了一个程序来消费 kafka 事件。它有一个守护进程,我想在 10 秒后终止。

请忽略缩进。
但是这个程序并没有在 10 秒后终止。想知道我在这里想念什么吗?

0 投票
1 回答
129 浏览

python - 如何提高以下代码性能以摄取 100 万条记录/秒

以下代码每秒摄取 10k-20k 条记录,我想提高它的性能。我正在阅读 json 格式并使用 Kafka 将其摄取到数据库中。-我在安装了 zookeeper 和 Kafka 的五个节点的集群上运行它。

你能给我一些改进的建议吗?