问题标签 [kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - Kafka Consumer:如何从 Python 中的最后一条消息开始消费
我正在使用 Kafka 0.8.1 和 Kafka python-0.9.0。在我的设置中,我设置了 2 个 kafka 代理。当我运行我的 kafka 消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都很好!
我的问题是,当我重新启动消费者时,它会从头开始消费消息。我所期待的是,在重新启动时,消费者会从它死前停止的地方开始消费消息。
我确实尝试跟踪 Redis 中的消息偏移量,然后在从队列中读取消息之前调用 consumer.seek 以确保我只收到以前未见过的消息。虽然这行得通,但在部署此解决方案之前,我想与大家核实一下……也许我对 Kafka 或 python-Kafka 客户端有一些误解。似乎消费者能够从中断的地方重新开始阅读是非常基本的功能。
谢谢!
python - 如何使用 Python 以编程方式在 Apache Kafka 中创建主题
到目前为止,我还没有看到一个 python 客户端在不使用配置选项自动创建主题的情况下显式实现主题的创建。
python - 用于消费的python kafka不起作用
我可以写信给卡夫卡。但是,消费不起作用
python - 在 kafka-python 的客户端中消费消息
我是卡夫卡的新手。通过一些 kafka-python 在线教程的帮助,我编写了以下代码:
但问题是,在最后一个 for 循环中,代码执行被卡住了,我无法弄清楚。
python - 如何从python客户端将JSON对象发送到kafka
我有一个简单的 JSON 对象,如下所示
以下是我向 Kafka 发送消息的 python 代码
我在风暴日志中看到正在接收消息,但它为元组 { json structure in here } 抛出 Transformation null 不确定需要做什么才能解决这个问题?..
python - 如何在程序中停止 Python Kafka Consumer?
我正在做 Python Kafka 消费者(尝试在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer )。当我运行以下代码时,它会一直运行,即使所有消息都已消耗。我希望消费者在消费完所有消息后会停止。怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Consumer 中)。
更新
我使用信号处理程序来调用 consumer.stop()。一些错误信息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过client.close()。但同样的结果。
我需要一些方法来优雅地停止 for 循环。
欢迎任何帮助。谢谢。
python - 如何使用 kafka.consumer.SimpleConsumer,seek()
API 文档在这里: http: //kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html
但是当我运行以下代码时,异常是%d 格式:需要一个数字,而不是 NoneType
当我使用以下代码时,异常是seek() got an unexpected keyword argument 'partition'
任何想法?谢谢。
python - 使用多处理持续使用 Kafka 并以特定时间间隔更新队列
我正在尝试不断消耗来自 kafka 的事件。同一个应用程序也使用这些消耗的数据,以 n 秒的间隔(假设 n = 60 秒)执行一些分析和更新数据库。
在同一个应用程序中,如果process1 = Kafka Consumer , process2= Data Analysis and database update logic.
process2
与计算和数据库更新有关,因此需要 5-10 秒来执行。我不想在执行process1
期间停顿process2
。因此,我正在使用multiprocessing module
(process1,process2
如果thread1,thread2
我在 python 中使用该Threading
模块,但由于我已经阅读了有关 GIL 的内容并且该Threading
模块无法利用多核架构,我决定使用该multiprocessing
模块。)来实现在这种情况下并发。(如果我对上面提到的模块限制的理解GIL
不Threading
正确,我很抱歉,请随时纠正我)。
我拥有的应用程序在两个进程之间进行了相当简单的交互,其中process1
仅在 60 秒内用它收到的所有消息填充队列,并在 60 秒结束时将所有消息传输到process2
.
我在使用此传输逻辑时遇到问题。如何在 60 秒结束时将队列的内容从转移process1
到process2
(我猜这将是主进程或另一个进程?这是我的另一个问题,除了主进程之外,我是否应该实例化 2 个进程?)随后清除队列内容,以便在另一个迭代中重新开始。
到目前为止,我有以下内容:
任何帮助将非常感激。
python - 使用 python Spark 将大型 CSV 发送到 Kafka
我正在尝试将大型 CSV 发送到 kafka。基本结构是读取 CSV 的一行并将其与标题一起压缩。
然后将其转换为带有以下内容的json:
然后我使用 kafka-python 库发送消息
使用 PYSPARK,我很容易从 CSV 文件中创建了消息的 RDD
现在我想发送这些消息:我定义了一个函数
然后我创建一个新的 RDD 来发送消息
然后我调用 sendRDD.count()
哪个开始搅动和发送消息
不幸的是,这非常慢。它每秒发送 1000 条消息。这是在一个 10 个节点的集群上,每个集群有 4 个 CPU 和 8GB 内存。
相比之下,在 1000 万行 csv 上创建消息大约需要 7 秒。~ 约 2GB
我认为问题在于我在函数内部实例化了一个 kafka 生产者。但是,如果我不这样做,那么 spark 会抱怨即使我尝试在全球范围内定义生产者也不存在。
也许有人可以阐明如何解决这个问题。
谢谢,
apache-kafka - 使用 kafka-python kafka 客户端的 KafkaConsumer 实例读取 Kafka 中最旧的可用消息
我尝试使用以下命令读取 kafka 消费者中的消息:
在这里我们可以读取大约 4 天的旧消息,因为我们在 kafka 服务器配置文件中将保留时间设置为 7 天。但是,当我们尝试使用 kaka-python 客户端库的 KafkaConsumer 读取消息时,如下所示:
我们今天收到的消息只有一些偏移量。我不知道如何在 Kafka 中获取最旧的消息,就像我们在上面的 kafka 消费者 shell 脚本中得到的那样。请帮忙。