问题标签 [kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-3.x - Kafka Python 客户端 - 如何处理可能的连接/超时错误?
我正在编写一个 Python 应用程序,它连接到 Kafka 队列并向队列发送消息。我有以下工作代码:
我想在这段代码中添加标准的 try-catch 表达式,这样我就可以捕获我可能遇到的任何类型的异常。连接、超时、NoBrokersAvailable 等各种异常是什么?我应该如何正确处理它们?
提前致谢!
python - 我们可以在 python 中以编程方式在 kafka 主题中创建分区吗?
我正在使用 kafka_2.11-0.9.0.0 并且我想将来自各种数据源的数据发送到主题中的各个分区,以在数据到达时以编程方式创建分区。卡夫卡可以做到这一点吗?如果是这样,怎么做?
python - 出现OverflowError:使用kafka-python producer-consumer时超时值太大
好吧,我正在尝试在 python 中使用 Kafka-python 包(1.3.2)来进行从生产者到消费者的简单数据传输。
制片人:
消费者:
我收到以下关于我的消费者的信息:
my-topic:0:5056: key=None value=b'message'
my-topic:0:5057: key=None value=b'message'
但与此同时,我在制片人那里遇到了这个错误:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 364, in wrapper
_self.close()
File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 420, in close
self._sender.join(timeout)
File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1060, in join
self._wait_for_tstate_lock(timeout=max(timeout, 0))
File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
OverflowError: timeout value is too large
默认情况下,超时设置为NONE
,并且正在设置为999999999
in Kafka.py
。我无法弄清楚在 KafkaProducer 中传递此超时的参数 - 在我的生产者代码中。
有没有人遇到过这个问题?或者任何人都可以在这个方向上帮助我。提前致谢。
python - Bluemix Message Hub - 未能初始化 SASL 身份验证但似乎仍然有效
当我尝试将 Bluemix Message Hub 与 Python 一起使用时,我不断收到错误和失败。任何想法为什么给出以下内容?
错误是“无法初始化 SASL 身份验证:代理不支持 SASL 握手(机制 PLAIN 需要)”
但是,当运行我的生产者和消费者时,它们似乎可以工作。产生消息并消费消息。
我在输出中看到与错误消息混合的正确消息。
我已经安装了 librdkafka 并按照说明确认一切正常:
https://github.com/ibm-messaging/message-hub-samples/blob/master/docs/librdkafka.md
当我运行 ./config 时,您可以在此处看到我的输出:
谢谢,亚伦
python - Unable to Poll for Binary Messages with `kafka-python`
I have a Kafka topic that is receiving binary data (raw packet capture data). I can confirm that it is indeed landing data using the Kafka CLI tools. I receive multiple messages each second.
But when I use kafka-python, I cannot retrieve any messages. The poll
method simply returns no results.
I have been able to use kafka-python to pull messages from a separate topic that contains just text strings.
I am curious if somehow internally kafka-python is dropping the messages because they are binary and failing some sort of validation. How can I dig deeper and see why no messages can be retrieved?
python - Kafka 10 - 具有身份验证和授权的 Python 客户端
我有一个启用了 SASL_SSL(身份验证(JAAS)和授权)的 Kafka10 集群。能够使用带有以下道具的 Java 客户端通过 SASL 连接。
并通过 JVM 参数传递 JAAS conf 文件。
无论如何用python客户端实现同样的事情吗?
apache-kafka - 在不同的服务器上消费和生产 Kafka 消息
如何生成和使用来自不同服务器的消息?我尝试了快速入门教程,但没有关于如何设置多服务器集群的说明。
我的步骤
服务器 A
1) bin/zookeeper-server-start.sh config/zookeeper.properties
2) bin/kafka-server-start.sh config/server.properties
3) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic test
4)bin/kafka-console-producer.sh --broker-list SERVER-a.IP:9092 --topic test
服务器 B
1A) bin/kafka-console-consumer.sh --bootstrap-server SERVER-a.IP:9092 --topic
test --from-beginning
1B)bin/kafka-console-consumer.sh --bootstrap-server SERVER-a.IP:2181 --topic
test --from-beginning
当我运行1A)消费者并将消息输入生产者时,消费者中没有消息出现。它只是空白。
相反,当我运行1B消费者时,我会在服务器 A 中获得大量且非常快速的错误日志流,直到我 Ctrl+C 消费者。请参阅下面
的服务器 A 上的错误日志以每秒数百次的速度流式传输
WARN Exception causing close of session 0x0 due to java.io.EOFException (org.apache.zookeeper.server.NIOServerCnxn)
O Closed socket connection for client /188.166.178.40:51168 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
谢谢
python - kafka-python 在消费者重启后从最后产生的消息中读取
我正在使用kafka-python来使用来自 kafka 队列(kafka 版本 0.10.2.0)的消息。特别是我正在使用KafkaConsumer类型。如果消费者停止并在一段时间后重新启动,我想从最新产生的消息重新启动,即删除消费者关闭期间产生的所有消息。我怎样才能做到这一点?
谢谢
python - 无法使用 Kafka-Python 的反序列化器使用来自 Kafka 的 JSON 消息
我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 将其读出另一端。但是,我不断看到以下错误:
我做了一些研究,这个错误的最常见原因是 JSON 错误。我尝试在发送之前打印出 JSON,方法是将以下内容添加到我的代码中,并且 JSON 打印没有错误。
这让我怀疑我可以生成 json,但不能使用它。
这是我的代码:
如果删除 value_serializer 和 value_deserializer,我可以成功发送和接收字符串。当我运行该代码时,我可以看到我发送的 JSON。这是一个简短的片段:
所以我尝试从消费者中删除 value_deserializer,并且该代码执行但没有反序列化器,消息以字符串形式出现,这不是我需要的。那么,为什么 value_deserializer 不起作用?是否有其他方法可以从我应该使用的 Kafka 消息中获取 JSON?
amazon-web-services - 从我的本地机器连接在 EC2 机器上运行的 Kafka
我是 Kafka 新手,在论坛中搜索了不同的帖子,但找不到解决方案。我已经在 EC2 实例上安装了 kafka,并尝试从我的 ubuntu 本地机器上连接它。我的目标是让 python kafka 客户端(生产者和消费者)在我的本地机器上运行并通过 EC2 kafka 实例发送/接收数据。那可能吗?
server.properties 配置文件中设置的属性:
在 Kafka EC2 实例上:
在 Kafka EC2 实例上的 Zookeeper cli 上:
我本地机器上的 Python 客户端(生产者):
我本地机器上的 Python 客户端(消费者):
但是,生产者无法连接到 Kafka EC2 实例并且失败并出现以下错误:
我的安全组规则请参考链接:
goo.gl/ZUVknv
在我的本地机器上以调试模式运行 Producer:
我尝试在另一个 EC2 实例中运行生产者客户端(在与 kafka 实例相同的 VPN 中),它工作正常。但是,当生产者在我的本地机器上运行时,它不起作用。'advertised.listeners' 属性是否在同一个(AWS VPN)网络中宣传 kafka 经纪人?或者我也可以从我的本地机器连接它?如果有人能指出我正确的方向,请告诉我。