问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4255 浏览

apache-kafka - kafka-python中的多处理

我一直在使用 python-kaka 模块从 kafka 代理中消费。我想从同一主题中并行使用“x”个分区。文档有这个:

这是否意味着我可以为我生成的每个进程创建一个单独的消费者?另外,consumer1 和 consumer2 使用的消息是否会重叠?

谢谢

0 投票
0 回答
816 浏览

python - Kafka Consumer Hangs on Startup

Due to one reason or another, we recently re-wrote our consumers and producers using different libraries than initially written in. However, I've been having some issues after the switch.

Using Kafka 0.9.0.2:
8 partitions

  • We consume from one topic, process the message, and push to another topic.

Due to extensive processing causing session timeouts before being able to commit offsets, the following config options were updated:
Consumer: session.timeout.ms: 1m40s
Broker: group.max.session.timeout.ms: 2m and group.min.session.timeout.ms: 6s

The issue I'm having is that once every several startups of my Consumer, it seems to hang while trying to fetch messages.
No errors or exceptions, it doesn't eventually timeout, it just sits. Considering my process which implements the consumer restarts every so often, this is a breaking problem, and I'm out of ideas. I don't know if it's a config update that needs to be made, or if I'm not handling shutdown properly, which is causing some sort of timeout to be exceeded on the broker.

  1. I add 100,000 messages to Kafka.
  2. I turn on this service and allow consumption of ~1000 messages.
  3. I restart the process to simulate what's happening.
  4. The process hangs, output: Consumer created

Has anyone seen their consumers hang on startup?
Is there anything I should look for in Kafka logs?

I'm also noticing consumption from the partitions to be increasingly slow.

0 投票
4 回答
9046 浏览

python - Python kafka消费者组ID问题

AFAIK,

kafka 中引入了分区和(消费者)组的概念来实现并行。我正在通过python使用kafka。我有一个特定的主题,它有(比如说)2 个分区。这意味着,如果我启动一个包含 2 个消费者的消费者组,他们将被映射(订阅)到不同的分区。

但是,kafka在 python 中使用库时,我遇到了一个奇怪的问题。我启动了 2 个具有基本相同组 ID 的消费者,并启动了线程让他们消费消息。

但是,kafka-stream 中的每条消息都被他们俩消费了!!这对我来说似乎很荒谬,甚至在概念上也不正确。无论如何我可以手动将消费者映射到某些(不同的)分区(如果它们没有自动映射到不同的分区)?

这是代码:

这是我使用 kafka-console-producer 生成的一些消息的输出:

而预期是其中之一。顺便说一句,这个主题k-test有 2 个分区。

0 投票
1 回答
15079 浏览

apache-kafka - 尝试连接到 Kafka 时没有可用的代理错误

尝试在 CentOS 上使用 Python 客户端本地连接到 Kafka 0.10.0.0 时,我遇到了一个非常奇怪的问题。

我的连接选项非常简单且默认:

当我在 Kafka 的 server.properties 文件中手动设置listeners选项时,例如:

我得到了 kafka.errors.NoBrokersAvailable,尽管我仍然可以使用 curl 或其他 linux 东西轻松连接到 Kafka 代理服务器。

没有adverted.listeners或其他不推荐使用的广告选项有助于解决问题。因此,唯一有效的配置状态是没有侦听器的状态。这当然是不可接受的,因为我们需要以某种方式设置本地集群。

似乎这个愚蠢问题的解决方案很简单,并且正在思考,但我们自己无法弄清楚。

0 投票
1 回答
1375 浏览

java - 使用 Apache Kafka 0.10.0 API 和 Java 创建一个 Kafka 代理集群

我想使用Kafka 0.10 API优选的 with创建一个代理集群Java。据我所知,kafka_2.11-0.10.0.0.jar支持使用以下方法创建代理:

但我找不到任何这样做的文档。我最近阅读了 [1],其中介绍了如何使用Kafka APIin创建主题Java。我们可以做类似的事情来 创建代理集群,更新分区,将现有数据/分区迁移到新代理(因为这些新代理不会自动分配任何数据分区,因此除非将分区移动到它们,否则它们将不会做任何工作[2])

[1]我们如何从 IDE 使用 API 在 Kafka 中创建主题

[2] https://kafka.apache.org/0100/ops.html#basic_ops_cluster_expansion

0 投票
1 回答
1164 浏览

apache-kafka - uwsgi python应用程序中的python-kafka生产者超时

当我的烧瓶应用程序与 uwsgi/nginx 一起运行时,我在与我的 kafka 服务器通信时遇到问题。当我在命令行上使用 python 启动应用程序时,一切正常。但是当我用 uwsgi Emperor 运行它时,我的制作人正在超时等待响应。创建生产者按预期工作;我只是在尝试发送新消息时遇到问题。我已经验证消息永远不会到达服务器,但是等待响应时抛出的异常只是“等待 5 秒后超时”。

如何解决此问题?应用程序中的一切工作正常,我只是无法发送任何 kafka 消息。我是否需要任何特殊配置以允许应用程序通过其他套接字进行通信?

我尝试过的事情:

  • 在我的 ini 配置中设置 close-on-exec
  • 禁用线程
  • 仅限于单个进程
0 投票
6 回答
41872 浏览

python - NoBrokersAvailable:NoBrokersAvailable-Kafka 错误

我已经开始学习卡夫卡了。尝试对其进行基本操作。我坚持关于“经纪人”的观点。

我的 kafka 正在运行,但是当我想创建一个分区时。

回溯(最近一次调用):文件“”,第 1 行,在文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”中,第 284 行,在init self._client = KafkaClient(metrics=self._metrics, **self.config) 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 202 行,在init self.config['api_version '] = self.check_version(timeout=check_timeout) 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 791 行,在 check_version 中引发 Errors.NoBrokersAvailable() kafka.errors。没有经纪人可用:没有经纪人可用

0 投票
3 回答
4781 浏览

apache-kafka - KafkaException:错误的请求类型 18

我最近尝试使用来自 Kafka 主题的消息,并收到以下错误消息:

正如您可能从错误消息中看出的那样,kafka 代理已映射到 localhost - 但除此之外,我无法真正解析它,而且很难找到错误消息“错误请求类型 18”的详细信息。

其他人有这个问题吗?

0 投票
1 回答
623 浏览

python - 如何结合两个 DStreams(pyspark)?

我有一个带有一些输入主题的 kafka 流。这是我为接受 kafka 流而编写的代码。

然后我创建原始流的键和值的两个 DStream。

然后我在值 DStream 中执行一些计算。例如,

现在,我需要将键和 val DStream 结合起来,并以 Kafka 流的形式返回结果。

如何将 val 与相应的键结合起来?

0 投票
1 回答
1689 浏览

python - python kafka:如何使每个味精从一开始就按组只消耗一次

我在这里使用Kafka 消费者(版本 1.3.1)。

我要实现的目标:

  • 有10个分区。每个分区都从偏移量 0 开始。

  • 有一组消费者(例如,1、2、3)。

  • 有时,一个消费者下降或上升。

  • 因此,小组成员可能会发生变化。但是我希望每个分区中的每条消息都应该只被组消费一次(1 OR 2 OR 3)。

我的代码是:

以上配置够吗?欢迎任何意见。谢谢

更新

我尝试了以下代码。每次在分区 760 中,每条消息可能被一组中的两个消费者消费两次。为什么?有问题?

输出 1:

在另一个窗口中运行相同的文件,输出: