问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2509 浏览

python - 如何使用带有分区和复制的 pykafka 创建新主题?

我希望能够使用pykafka以编程方式在 Kafka 中创建主题。我知道访问 TopicDict 会自动创建一个主题,如果一个主题不存在,但我不知道如何控制分区/副本的数量。此外,它还有一个令人讨厌的错误,如果 Kafka 出现故障,它最终会陷入无限循环。基本上我想做如下的事情:

0 投票
1 回答
9953 浏览

python - Kafka python消费者在启动时读取所有消息

我正在使用以下代码从主题中读取消息。我面临两个问题。每当我启动消费者时,它正在读取队列中的所有消息?如何只阅读未读消息?

0 投票
2 回答
372 浏览

apache-kafka - 如何使用火花流区分在 kafka 中收到的主题

我正在使用以下代码从 kafka 获取消息

斯卡拉代码:

这是我的示例生产者代码。

我的流媒体接收器打印

问题:

编辑:使用 keyedProducer

这让我犯了错误

0 投票
3 回答
9421 浏览

apache-spark - 如何将数据从 Kafka 传递到 Spark Streaming?

我正在尝试将数据从 kafka 传递到 spark 流。

这是我到目前为止所做的:

  1. 安装了两个kafkaspark
  2. zookeeper从默认属性配置开始
  3. kafka server从默认属性配置开始
  4. 开始kafka producer
  5. 开始kafka consumer
  6. 从生产者向消费者发送消息。工作正常。
  7. 写了kafka-spark.py来接收来自 kafka 的消息来激发火花。
  8. 我试着跑步./bin/spark-submit examples/src/main/python/kafka-spark.py
  9. 我得到一个错误。

kafka-spark.py -

完整日志,包括运行 spark-kafka.py 的错误:

编辑

在运行时,./bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar examples/src/main/python/kafka-spark.py我得到 HEXADECIMAL 位置而不是实际字符串:

知道我在做什么错吗?我对 kakfa 和 spark 很陌生,所以我需要一些帮助。谢谢!

0 投票
1 回答
4334 浏览

apache-kafka - pykafka 无法连接 kafka 代理

当我使用通过以下代码pykafka连接集群时:kafka

我得到以下异常:

raise Exception('无法连接到代理来获取元数据。')

例外:无法连接到代理以获取元数据。

但是当我使用命令行时,例如:

kafka-console-producer --broker-list 10.0.0.101:9092 --topic userCND

它工作正常,但只是给我一个警告信息:

WARN 属性主题无效 (kafka.utils.VerifiableProperties)

0 投票
0 回答
1686 浏览

kafka-python - 异常 AttributeError:“'KafkaProducer' 对象没有属性 '_closed'”

我是使用 python 生成消息,但我得到了这个异常,在我的脚本下面是..

`from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='192.168.1.6:9092') producer.send('test', b'Welcome Nagarajan').get(timeout=60)

收到错误消息...

回溯(最后一次调用):文件“”,第 1 行,文件“/usr/local/lib/python2.7/dist-packages/kafka/producer/kafka.py”,第 245 行,init self.config ['api_version'] = client.check_version() 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 607 行,在 check_version connect() 文件“/usr/local/ lib/python2.7/dist-packages/kafka/client_async.py”,第 575 行,在连接中引发 Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: None

0 投票
6 回答
17230 浏览

python - kafka-python 消费者未收到消息

我无法KafaConsumer让它从头开始读取,或者从任何其他显式偏移量读取。

为同一主题的消费者运行命令行工具,我确实看到带有该--from-beginning选项的消息,否则它会挂起

如果我通过 python 运行它,它会挂起,我怀疑这是由不正确的消费者配置引起的

输出:

使用来自给定主题的消息(之后挂起)

我使用的是 kafka-python 0.9.5,代理运行的是 kafka 8.2。不确定确切的问题是什么。

按照 dpkp 的建议设置 _group_id=None_ 以模拟控制台使用者的行为。

0 投票
2 回答
9905 浏览

python - Flask - 拉取实时流 kafka 数据 - 将 Kafka 与 Python Flask 集成

这个项目是为了real time search engine - log analysis表演。

我有一个从 Spark 处理到 Kafka 的实时流数据。

现在有了 Kafka 输出,我想get the data from the Kafka using Flask.. 和visualize it using Chartjs/或其他一些可视化..

如何从中获取实时流数据Kafka using the python flask

知道我该如何开始吗?

任何帮助将不胜感激!

谢谢!

0 投票
2 回答
2431 浏览

python - 如何在龙卷风上使用卡夫卡?

我正在尝试基于此使用龙卷风制作一个简单的聊天应用程序

但我也想使用 kafka 来存储消息。我怎样才能做到这一点?

现在,我用来制作消费者,它以某种方式工作,但它只在控制台上打印,我需要在网页上显示消息,比如 tornade 应用程序,只是它保存在 kafka 中。

这是我现在的 app.py 代码

0 投票
1 回答
2168 浏览

python - 为什么我的 Kafka 消费者比我的 Kafka 生产者慢得多?

我得到了一个可以完全抓取的数据流。数据全部放入 Kafka,然后发送到 Cassandra。现在卡夫卡消费者非常慢,比生产者慢得多。我希望它们完全一样。我该怎么做才能达到这个结果或我的代码有什么问题?

这是我在 python 中的 Kafka 消费者代码:

编辑:

我还添加了我的个人资料结果,代码行似乎很慢

感谢你的回复。