问题标签 [confluent-kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
156 浏览

apache-kafka - 最佳实践中的 ksqlDB 查询

我目前正在构建一个 ksqlDB 实例,我的目标是以交互模式部署它。

我创建了流和表来为 RocksDB 提供窗口聚合。

我想用 REST API 调用(即 KSQL Rest API 的 Python 包装器)查询缓存,但我不确定这是否是正确的实现方法,因为调用的数量可以达到 1K TPS。

您能否让我知道这是否是反模式,如果是,最佳实践是什么?

谢谢。

0 投票
1 回答
511 浏览

apache-kafka - 如何为 confluent-python 客户端覆盖 kafka 中的默认配置值?

作为初学者,我正在探索 Apache Kafka 和 confluent-kafka-python 客户端。当我尝试从生产者发送简单消息时,消费者能够成功消费消息。以为我会尝试将图像作为有效负载发送。所以继续使用 1MB(png) 图像,我的制作人无法生成消息。我遇到的错误是

虽然我做了一些谷歌搜索发现Kafka - Broker: Message size too largeHow can I send large messages with Kafka (over 15MB)? 所以我修改了我的 server.props(broker side),如下所示:

但我仍然无法解决这个问题。

生产者.py

消费者.py

我需要添加任何参数还是我在配置中遗漏了什么?任何帮助,将不胜感激。

谢谢

0 投票
0 回答
137 浏览

python - 在 Cloud Dataflow 上安装 confluent-kafka 时出错

希望你们一切都好。

我正在尝试部署一个将从 Kafka 摄取数据的 Dataflow 管道。为此,我正在使用 lib confluent-kafka,但出现以下错误。

NameError:名称“confluent_kafka”未定义

我已经用 apt 安装 librdkafka-dev 并在 setup.py 上安装 confluent-kafka 但错误仍然存​​在。

这是我的 setup.py 文件:

关于如何解决这个问题的任何想法?

0 投票
1 回答
55 浏览

kafka-consumer-api - Kafka 消费者仅消费 1 条最近的记录

我正在运行代码,它只消耗该主题的 1 条最新记录。我想要该主题的所有记录。我存储记录的输出文件也有记录的元数据。如何跳过元数据。

0 投票
1 回答
830 浏览

apache-kafka - Apache Kafka 和 JSON 模式

我开始接触 Apache Kafka (Confluent) 并对模式的使用有一些疑问。首先,我对架构用于验证数据的一般理解是否正确?我对模式的理解是,当“生成”数据时,它会检查键和值是否符合预定义的概念并相应地拆分它们。

我目前的技术设置如下:

Python:

在 Confluent 中,我为我的主题设置了一个 json 键模式:

现在,当我生成数据时,Confluent 中的消息仅包含“值”中的内容。Key 和 Header 为空:

基本上,如果我设置了这个模式并没有什么不同,所以我想它只是在我设置它时不起作用。您能帮我解决我的思维方式错误或我的代码缺少输入的地方吗?

0 投票
1 回答
326 浏览

apache-kafka - 卡夫卡延迟线性增加

我正在尝试对 Apache Kafka 进行基准测试,目前我正在尝试测量延迟。为了做到这一点,我正在生成 10 MB 的推文数据(约 14500 条推文),以免失去对正在发生的事情的概述,但我基本上可以在很久以后任意发送。因此,这 10 MB 目前仅用于测试目的。

我已将代理配置为使用LogAppendTime时间戳,我将其与消费者端的当前时间进行比较。这样做时,我总是得到随时间增加的延迟值(请参阅下面的详细信息),这不是我所期望的。

设置

  • 带有 Ubuntu 20 的虚拟机
  • 2 个 VPU
  • 9GB 内存
  • confluent-kafkaApache Kafka在最新版本中
  • 1个Partition,1个Kafka Broker,所有默认设置(设置除外LogAppendTime

生产者和消费者都在同一台机器上运行(至少现在是这样)。虚拟机在数据中心运行,所以带宽应该没问题。

常数

生产者代码

消费者守则

我得到什么

消费端

制作方

我会期待什么

如您所见,我正在使用delta = (time.time() * 1000) - msg.timestamp()[1]. 我希望延迟最多在 10 到 100 毫秒之间变化。我不明白为什么延迟会随着时间“线性”增加。

另外,我不明白为什么buffer queue会变得如此之大,.poll()因为据我所知,Kafka 应该在每次通话后清除队列。

有谁知道这可能是什么原因以及我能做些什么?

0 投票
0 回答
47 浏览

python - 如何转移 kafka kerberos 日志

我已经使用 confluent-kafka-python 库在 python 中为 kerberized kafka 集群编写了 kafka 生产者和消费者代码。他们都工作正常,但打印了大量的日志。是否可以将这些日志转移到文件中?

0 投票
2 回答
637 浏览

apache-kafka - 使用 python 从 kafka 读取最新偏移量

我正在使用 confluent-kafka Python 库从 kafka 中读取。我正在使用以下消费者设置

我的目标是确保我始终阅读 kafka 中的最新消息。只要程序继续运行,上述方法就可以工作。但是,如果程序由于某种原因崩溃,它会从上次使用的消息开始读取,而不是从主题中的最后一条消息开始读取。

我不介意丢失一些消息,但我始终阅读最新消息是绝对必要的。看起来消费者记住了偏移量并从它开始而不是从最新的偏移量开始。

我尝试将enable.auto.commit参数设置为 False,但我得到了相同的结果。

0 投票
1 回答
323 浏览

python - kafka python - 找到消费者尚未订阅的新 kafka 主题的正确方法是什么?

我是卡夫卡世界的新手,并试图为 python 中的卡夫卡消费者做以下事情

  1. 获取所有 kafka 主题的列表。
  2. 获取消费者订阅的主题列表。
  3. 订阅新主题(尚未订阅)。

注意:我可以使用 confluent-kafka / kafka-python 库来实现这一点。

任何帮助,将不胜感激。

0 投票
0 回答
107 浏览

apache-spark - kafkashaded.org.apache.kafka.common.errors.TimeoutException:60000 毫秒后更新元数据失败

我目前正在研究用例,我正在将 pyspark 数据框写入 confluent-kafka 主题。

下面是我得到的错误。

在该主题上启用的身份验证是 `SASL PLAIN。我想知道,我将数据帧写入 confluent-kafka 主题的方法是否正确?还是我还需要添加其他配置。

我是新来的火花。任何帮助,将不胜感激。