问题标签 [kafka-python]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
737 浏览

python - python-kafka:消费者是否可以根据消息属性跳过消息?

给定一组消息,每个消息都有一个优先级属性。消费者是否可以跳过或忽略优先级属性不够高的消息?

一开始我反序列化了 kafka-python 消费者消费的消息,检查了优先级并丢弃了那些不符合要求的优先级的消息。

但我担心如果消息很大,将浪费大量时间来反序列化不会进一步处理的消息。从kafka队列中读取是否有办法做到这一点?

0 投票
1 回答
2901 浏览

apache-kafka - Kafka EARLIEST 和 Kafka LATEST 偏移重置的效率如何?

问题

我正在考虑实现二进制搜索以找到基于时间的事件重放的起始偏移量。为了做到这一点,我正在考虑使用 EARLIEST 来查找开始偏移量和 LATEST 来查找最新偏移量。之后我可以实现二进制搜索来找到我需要开始重播的偏移量。

问题

我想知道寻找 EARLIEST 和 LATEST 的效率如何,以及它是如何实施的。他们是否只是简单地使用主题目录中的 znode 时间戳并找到具有最新时间戳的文件来查看?那将是我的猜测,但我只是在那里拍摄黑暗。

先感谢您!

0 投票
1 回答
436 浏览

apache-kafka - 如何使用 python 或使用任何内置方法从 kafka 主题中删除特定数量的行?

我在使用 consumer.poll() 方法时遇到问题。使用 poll() 方法获取数据后,消费者将没有任何数据要提交,所以请帮我从 kafka 主题中删除特定数量的行。

0 投票
1 回答
1843 浏览

python - Python:模拟 Kafka 进行集成测试

我对集成测试有点陌生。我有两个使用 Kafka 将消息相互传递的服务。但是,对于我的集成测试,我不一定想让 Kafka 运行以运行我的测试。有没有标准的方法来模拟卡夫卡?或者这是我需要自己创建的东西,一些 MockKafka 队列和应用程序中适当的补丁?此外,这是否违反了集成测试应该做的事情?我对此的看法是我没有测试 Kafka 的任何功能,并且为了集成测试应该被模拟出来。

0 投票
0 回答
139 浏览

apache-kafka - 当我想在消费者组重新平衡回调函数中存储和获取 kafka 之外的偏移量时如何获取组 ID

但我无法在这些回调函数中获取 consumer_group_id

0 投票
1 回答
2741 浏览

python-2.7 - PySpark 处理流数据并将处理后的数据保存到文件

我正在尝试复制一个正在流式传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。我正在使用 Kafka 和 Spark 流式传输(在 pyspark 上),这是我的架构:

1-Kafka 生产者以以下字符串格式向名为 test 的主题发出数据:

生产者代码:

生产者工作正常,我在消费者中获得流数据(甚至在火花中)

2- Spark 流正在接收这个流,我什至pprint()可以

Spark流处理代码

作为一个错误,我得到:

和其他例外。

我真正想要的是将每个条目"LG float LT float"作为 JSON 格式保存在文件中,但首先我想简单地将坐标保存在文件中,我似乎无法做到这一点。有什么想法吗?

如果需要,我可以提供完整的堆栈跟踪

0 投票
1 回答
461 浏览

python - 当一些经纪人不可用时,python kafka 生产者如何工作?

我已经建立了一个 3 节点 kafka 集群并使用 python 作为生产者,如下所示:

当“n0”和“n1”可用但“n2”不可用(broker故障或网络错误)时,producer无法正常工作,发送到“n0n1”但抛出错误:

0 投票
1 回答
811 浏览

apache-kafka - 新 kafka 消费者的等效属性“num.consumer.fetchers”

在 Kafka 的旧消费者配置中,有一个属性num.consumer.fetchers可以配置用于获取数据的获取线程数。在 Kafka 的新消费者配置中,是否有任何具有相同功能的属性?如果没有,新消费者是如何解决这个问题的?

0 投票
1 回答
1199 浏览

apache-spark - Pyspark Kafka 偏移范围单位

我使用 Spark 作为批处理来处理来自 kafka 的日志。在每个周期中,我的代码应该得到任何到达 kafka 消费者的东西。但是,我想限制每个周期从 kafka 获取的数据量。假设 5 GB 或 500000 条日志行..

如果驱动程序失败,我会将偏移量存储在内存和磁盘中。但是我怎样才能强加这些 kafka 偏移量来限制每个周期的最大数据量呢?卡夫卡偏移范围的单位是什么?

提前致谢!

0 投票
1 回答
18476 浏览

apache-kafka - Kafka最优保留和删除策略

我对卡夫卡相当陌生,所以如果这个问题是微不足道的,请原谅我。我有一个非常简单的设置用于定时测试,如下所示:

机器 A -> 写入主题 1(代理)-> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2(代理)-> 机器 A 从主题 2 读取

现在我正在无限循环中发送大约 1400 字节的消息,很快就填满了我的小型代理上的空间。我正在尝试为 log.retention.ms、log.retention.bytes、log.segment.bytes 和 log.segment.delete.delay.ms 设置不同的值。首先,我将所有值设置为允许的最小值,但这似乎降低了性能,然后我将它们设置为我的代理在完全填满之前可以采用的最大值,但是当删除发生时性能再次下降。是否有设置这些值以获得绝对最小延迟的最佳实践?

谢谢您的帮助!