问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
3911 浏览

docker - 在 alpine 容器中使用 confluent-kafka python 客户端

我正在尝试运行一个与 kafka 通信的简单 python 应用程序。我正在寻找使用高山容器。这是我当前的 dockerfile (它不是最佳的......只是想让事情暂时正常)。

需求文件中有 confluent-kafka。构建失败

我的问题是 a) 有没有办法让这个工作而不在容器内构建?如果我可以简单地将库复制到 alpine 就足够了。或者即使我可以复制 librdkafka。b)如果没有,我怎样才能让 libssl 和 libcryto.so 工作?

0 投票
3 回答
4451 浏览

apache-kafka - 对于 AvroProducer 到 Kafka,“键”和“值”的 avro 模式在哪里?

confluent-kafka-python repoAvroProducer中的示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:

似乎这些文件是独立于 Avro Schema Registry 加载的ValueSchema.avscKeySchema.avsc

这是正确的吗?引用 Avro 模式注册表的 URL,然后从磁盘加载模式以获取键/值有什么意义?

请说清楚。

0 投票
1 回答
953 浏览

apache-kafka - 无法从日期创建 Avro Schema 对象

我有一个具有此属性的 Avro 架构:

我正在使用 Python 客户端和生产者

当我使用上述属性加载 Avro 模式时,我触发了这个错误:

我认为这个错误说明了 Avro 模式的 Confluent 解析器的限制。也就是说,是此处date指定的逻辑类型,但 Confluent 解析器尚不支持它。

任何人都可以确认吗?更一般地说,我应该假设 Confluent 解析器不处理 Avro 模式的逻辑类型吗?谢谢。

0 投票
1 回答
73 浏览

apache-kafka - kafka 生产者可以在存在副本的情况下以配额速率提供数据吗?

我有一个属于客户端的 kafka 生产者,其客户端 ID -“p1”,配额为 50 MBps。

现在我使用“bin/kafka-producer-perf-test.sh”测试了我的生产者的性能,当写入没有副本的分区时,我能够获得接近 50 MBps 的吞吐量。

我在具有三个副本的分区上尝试了相同的实验。但这次吞吐量降低到 30 MBps。

我的问题是,即使存在副本,kafka 是否不应该允许生产者仍然获得 50 MBps 的吞吐量?系统中没有其他任何东西在运行,所以我不确定为什么会这样?

0 投票
1 回答
1281 浏览

ibm-cloud - kafka python - Bluemix MessageHub - ConnectionError:套接字已断开连接

我正在使用 kafka python 客户端将消息推送到 Message Hub,但注意到在运行我的应用程序一段时间后它会停止向 Message Hub 发送消息。

然后我在我的日志文件中注意到以下内容:

我更新了我的代码以添加retries=5

我的代码从 python 烧瓶应用程序运行。每个传入的特定类型的请求都会调用该send_message()方法。

以下是来自 Bluemix 的相关日志行。我可能错过了一两行复制和粘贴,但希望这足以弄清楚发生了什么:

我的话题存在。我的完整客户端代码在这里:https ://github.com/snowch/movie-recommender-demo/blob/effc981cc9f799c41952719619f693172eebcd6a/web_app/app/messagehub_client.py

任何最受赞赏的指针...

0 投票
1 回答
1530 浏览

python - Kafka 生产者没有选择新的分区

我是 Kafka 的新手,正在尝试在其上构建服务消息传递平台的服务。这是我的设置:

卡夫卡 0.9.0.1
动物园管理员 3.4.8卡夫卡
-蟒蛇1.3.3

我的应用程序创建了一个KafkaProducer,我从中将消息流发送到具有 6 个分区的单个主题。我还创建了 7 个KafkaConsumer(在一个 s 下group_id,其中 6 个分配给 6 个分区,一个处于空闲状态(这是预期的)。当生产者正在流式传输时,我将分区计数增加到 7,并期望流不会分布在 7 个分区上并且会唤醒空闲的消费者。但是,在我通过重新启动应用程序重新初始化它之前,生产者似乎不会拿起新添加的分区。我缩放分区计数通过运行:

kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7

生产者有没有办法在不重新初始化的情况下获取分区计数的变化?

以下是相关的代码片段:

制片人

消费者

0 投票
1 回答
350 浏览

apache-kafka - Kafka Producer 停止了我的代码

我正在调用一个从 kafka 生产者发送一些数据的函数,但是在它发送之后我返回了一个不返回的响应。代码在返回时卡住了。有人知道发生了什么吗?

我的代码如下,

0 投票
1 回答
161 浏览

apache-kafka - 运行 Kafka-python 会导致整个计算机冻结(ESXi 6.5 VM 虚拟机冻结内核恐慌)

我正在使用 Zookeeper 和 kafka (0.8.2.1) 运行 Python2.7。我正在使用最新的 Kafka-python 客户端(pip install kafka)。

以下是一个最小可重现的示例:

这将打印“即将打印消息”,然后整个 VM(Ubuntu Xenial,16.04 LTS)冻结。我什至看不到 VM 内的光标。

有趣的是,我在我的另一台计算机(不是虚拟机)上运行完全相同的 Ubuntu 映像并且它可以工作。虚拟机在 ESXi 6.5 上运行。

这个答案PyCharm freeze whole computer on Ubuntu说这可能是 openjdk 的错(我不使用 pycharm;我从终端执行代码),所以我切换到 Oraclejdk。没有解决问题。我已经尝试了所有我能想到的东西,但我已经束手无策了(ubuntu trusty 正在下载,我将尝试在该映像上运行它,看看它是否适用于 VM)。任何指针/帮助表示赞赏。谢谢!

更新:如果您在 Ubuntu Xenial (16.04) 或更新版本上也遇到此问题,请尝试使用 Trusty (14.04) 映像。似乎已经为我解决了这个问题,但可能还为时过早。

更新 2我错了。还是不行。似乎是内核中的一个错误。

0 投票
1 回答
604 浏览

python - kafka-python ssl 支持 python < v2.7.9(无属性'SSLContext')

尝试使用 ssl 连接到 kafka 时kafka-python

我收到以下错误:

python v2.7.3在(Debian 7 wheezy)上运行的
ssl.SSLContex属性是从添加的python v2.7.9

似乎kafka-python从 v1.1.0 引入了对 ssl 的支持,但由于ssl.SSLContext使用,它需要 python 版本> = v2.7.9。

有没有办法使用较低版本的 python 和 ssl 启动连接kafka-python
或者以某种方式绕过这个?

也许有人可以建议另一个支持 ssl 连接到 kafka 并支持 python 版本 =< 2.7.3 的包。

0 投票
2 回答
1442 浏览

python - 获取从 python KafkaProducer 发送的消息

我的目标是从非文件源(即在程序中生成或通过 API 发送)获取数据并将其发送到 spark 流。为此,我通过基于python 的方式 KafkaProducer发送数据:

我的问题是从消费者 shell 脚本检查主题时没有出现任何内容:

这里有什么遗漏或错误吗?我是 spark/kafka/messaging 系统的新手,所以任何事情都会有所帮助。Kafka 版本是 0.11.0.0 (Scala 2.11),并且没有对配置文件进行任何更改。