问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1359 浏览

python-3.x - Kafka Python 客户端 - 如何处理可能的连接/超时错误?

我正在编写一个 Python 应用程序,它连接到 Kafka 队列并向队列发送消息。我有以下工作代码:

我想在这段代码中添加标准的 try-catch 表达式,这样我就可以捕获我可能遇到的任何类型的异常。连接、超时、NoBrokersAvailable 等各种异常是什么?我应该如何正确处理它们?

提前致谢!

0 投票
0 回答
824 浏览

python - 我们可以在 python 中以编程方式在 kafka 主题中创建分区吗?

我正在使用 kafka_2.11-0.9.0.0 并且我想将来自各种数据源的数据发送到主题中的各个分区,以在数据到达时以编程方式创建分区。卡夫卡可以做到这一点吗?如果是这样,怎么做?

0 投票
2 回答
1343 浏览

python - 出现OverflowError:使用kafka-python producer-consumer时超时值太大

好吧,我正在尝试在 python 中使用 Kafka-python 包(1.3.2)来进行从生产者到消费者的简单数据传输。

制片人:

消费者:

我收到以下关于我的消费者的信息:

my-topic:0:5056: key=None value=b'message' my-topic:0:5057: key=None value=b'message'

但与此同时,我在制片人那里遇到了这个错误:

Error in atexit._run_exitfuncs: Traceback (most recent call last): File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 364, in wrapper _self.close() File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 420, in close self._sender.join(timeout) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1060, in join self._wait_for_tstate_lock(timeout=max(timeout, 0)) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): OverflowError: timeout value is too large

默认情况下,超时设置为NONE,并且正在设置为999999999in Kafka.py。我无法弄清楚在 KafkaProducer 中传递此超时的参数 - 在我的生产者代码中。

有没有人遇到过这个问题?或者任何人都可以在这个方向上帮助我。提前致谢。

0 投票
2 回答
430 浏览

python - Bluemix Message Hub - 未能初始化 SASL 身份验证但似乎仍然有效

当我尝试将 Bluemix Message Hub 与 Python 一起使用时,我不断收到错误和失败。任何想法为什么给出以下内容?

错误是“无法初始化 SASL 身份验证:代理不支持 SASL 握手(机制 PLAIN 需要)”

但是,当运行我的生产者和消费者时,它们似乎可以工作。产生消息并消费消息。

我在输出中看到与错误消息混合的正确消息。

我已经安装了 librdkafka 并按照说明确认一切正常:

https://github.com/ibm-messaging/message-hub-samples/blob/master/docs/librdkafka.md

当我运行 ./config 时,您可以在此处看到我的输出:

谢谢,亚伦

0 投票
1 回答
909 浏览

python - Unable to Poll for Binary Messages with `kafka-python`

I have a Kafka topic that is receiving binary data (raw packet capture data). I can confirm that it is indeed landing data using the Kafka CLI tools. I receive multiple messages each second.

But when I use kafka-python, I cannot retrieve any messages. The poll method simply returns no results.

I have been able to use kafka-python to pull messages from a separate topic that contains just text strings.

I am curious if somehow internally kafka-python is dropping the messages because they are binary and failing some sort of validation. How can I dig deeper and see why no messages can be retrieved?

0 投票
2 回答
13450 浏览

python - Kafka 10 - 具有身份验证和授权的 Python 客户端

我有一个启用了 SASL_SSL(身份验证(JAAS)和授权)的 Kafka10 集群。能够使用带有以下道具的 Java 客户端通过 SASL 连接。

并通过 JVM 参数传递 JAAS conf 文件。

无论如何用python客户端实现同样的事情吗?

0 投票
1 回答
3251 浏览

apache-kafka - 在不同的服务器上消费和生产 Kafka 消息

如何生成和使用来自不同服务器的消息?我尝试了快速入门教程,但没有关于如何设置多服务器集群的说明。

我的步骤
服务器 A
1) bin/zookeeper-server-start.sh config/zookeeper.properties
2) bin/kafka-server-start.sh config/server.properties
3) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4)bin/kafka-console-producer.sh --broker-list SERVER-a.IP:9092 --topic test

服务器 B
1A) bin/kafka-console-consumer.sh --bootstrap-server SERVER-a.IP:9092 --topic test --from-beginning
1B)bin/kafka-console-consumer.sh --bootstrap-server SERVER-a.IP:2181 --topic test --from-beginning

当我运行1A)消费者并将消息输入生产者时,消费者中没有消息出现。它只是空白。
相反,当我运行1B消费者时,我会在服务器 A 中获得大量且非常快速的错误日志流,直到我 Ctrl+C 消费者。请参阅下面
的服务器 A 上的错误日志以每秒数百次的速度流式传输
WARN Exception causing close of session 0x0 due to java.io.EOFException (org.apache.zookeeper.server.NIOServerCnxn)
O Closed socket connection for client /188.166.178.40:51168 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn) 谢谢

0 投票
4 回答
11267 浏览

python - kafka-python 在消费者重启后从最后产生的消息中读取

我正在使用kafka-python来使用来自 kafka 队列(kafka 版本 0.10.2.0)的消息。特别是我正在使用KafkaConsumer类型。如果消费者停止并在一段时间后重新启动,我想从最新产生的消息重新启动,即删除消费者关闭期间产生的所有消息。我怎样才能做到这一点?

谢谢

0 投票
3 回答
9595 浏览

python - 无法使用 Kafka-Python 的反序列化器使用来自 Kafka 的 JSON 消息

我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 将其读出另一端。但是,我不断看到以下错误:

我做了一些研究,这个错误的最常见原因是 JSON 错误。我尝试在发送之前打印出 JSON,方法是将以下内容添加到我的代码中,并且 JSON 打印没有错误。

这让我怀疑我可以生成 json,但不能使用它。

这是我的代码:

如果删除 value_serializer 和 value_deserializer,我可以成功发送和接收字符串。当我运行该代码时,我可以看到我发送的 JSON。这是一个简短的片段:

所以我尝试从消费者中删除 value_deserializer,并且该代码执行但没有反序列化器,消息以字符串形式出现,这不是我需要的。那么,为什么 value_deserializer 不起作用?是否有其他方法可以从我应该使用的 Kafka 消息中获取 JSON?

0 投票
2 回答
3171 浏览

amazon-web-services - 从我的本地机器连接在 EC2 机器上运行的 Kafka

我是 Kafka 新手,在论坛中搜索了不同的帖子,但找不到解决方案。我已经在 EC2 实例上安装了 kafka,并尝试从我的 ubuntu 本地机器上连接它。我的目标是让 python kafka 客户端(生产者和消费者)在我的本地机器上运行并通过 EC2 kafka 实例发送/接收数据。那可能吗?

server.properties 配置文件中设置的属性:

在 Kafka EC2 实例上:

在 Kafka EC2 实例上的 Zookeeper cli 上:

我本地机器上的 Python 客户端(生产者):

我本地机器上的 Python 客户端(消费者):

但是,生产者无法连接到 Kafka EC2 实例并且失败并出现以下错误:

我的安全组规则请参考链接:

goo.gl/ZUVknv

在我的本地机器上以调试模式运行 Producer:

我尝试在另一个 EC2 实例中运行生产者客户端(在与 kafka 实例相同的 VPN 中),它工作正常。但是,当生产者在我的本地机器上运行时,它不起作用。'advertised.listeners' 属性是否在同一个(AWS VPN)网络中宣传 kafka 经纪人?或者我也可以从我的本地机器连接它?如果有人能指出我正确的方向,请告诉我。