问题标签 [pykafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2904 浏览

python - 如何使用 Pykafka 从主题中获取最新消息?

我正在使用 pykafka 不断向主题生成消息

我想收到最新的消息。我在 pykafka Github 页面上找到了一个解决方案,它建议:

但是,我真的不明白这里发生了什么,如果那里至少有两条消息,它只会获取最新消息。

有没有更强大的解决方案?

0 投票
2 回答
3749 浏览

python - 如何使用消息批处理或带有 pykafka 的缓冲区生成 kafka 主题

如何使用消息批处理或带有 pykafka 的缓冲区生成 kafka 主题。我的意思是一个生产者可以在一个生产过程中产生许多消息。我知道使用消息批处理或缓冲区消息的概念,但我不知道如何实现它。我希望有人可以在这里帮助我

0 投票
1 回答
423 浏览

python - How to use pykafka group consumer with gevent?

I use pykafka group consumer with gevent, but the results have repeating data. show my code:

reulst: Anyone can tell me how to make it work, does pykafka not support gevent?

0 投票
1 回答
3927 浏览

django - kafka 消费者、单独服务或 Django 组件的作用?

我正在设计一个网络日志分析。

我找到了一位架构师,使用 Django(Back-end & front-end)+ kafka + spark。

我还从这个链接中找到了一些相同的系统:http: //thevivekpandey.github.io/posts/2017-09-19-high-velocity-data-ingestion.html和下面的建筑师

在此处输入图像描述

但我对卡夫卡消费者的角色感到困惑。它将是一项独立于 Django 的服务,对吧?

所以如果我想将实时数据绘制到前端图表,我如何附加到 Django。

如果我将 kafka-consumer 和 producer 都放在 Django 中,那就太荒谬了。来自 sdk 的请求通过传递给 kafa 主题(生产者)到达 Django 并返回 Django(消费者)进行处理。为什么我们不直接去。它看起来更简单更好。

请帮我理解kafka消费者的角色,它应该属于哪里?以及如何连接到我的前端。

谢谢和最好的问候,

詹姆斯

0 投票
1 回答
911 浏览

python - 如何在 Pykafka simpleconsumer 中选择起始偏移量?

在我的 kafka 集群单分区主题中,我有一个简单的消费者处理所有传入的消息,如果处理的数据出现错误,我想以相同的顺序重新处理来自某个偏移量(不是开头)的所有消息,以修复不一致并保持来自kafka的原始有序消息序列。

有没有办法用 Pykafka 做到这一点?我不明白

0 投票
1 回答
615 浏览

python - 使用 pykafka 和 asyncio 时异步不起作用

我尝试使用异步调用多个 pykafka 消费者函数。但是,第一个 pykafka 消费者函数将阻止另一个函数工作。

QueueConsumer 库:

然后我使用以下方法调用这些函数:

结果将仅返回来自主题“测试”的队列消息。

编辑:我尝试添加另一个功能

然后像这样调用:

执行阶乘函数的一些打印。但是,当调用 test 或 coba 的 print 时,它只会停止其他的。

0 投票
2 回答
1914 浏览

apache-spark - Kafka 和 Pyspark 集成

我对大数据很天真,我正在尝试将 kafka 连接到 spark。这是我的生产者代码

这是在我执行 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning 时在控制台使用者上打印生成的文本

现在我希望使用 Spark 使用此文本,这是我的 Jupyter 代码

但这会在我的 Jupyter 上产生输出

不是我的控制台消费者上的文字。请帮忙,无法找出错误。

0 投票
1 回答
75 浏览

apache-kafka - kafka如何识别消费群体?

我有这段代码可以在 pykafka 中创建消费者平衡:

如何跨流程管理消费者的状态?

谢谢

0 投票
0 回答
379 浏览

python - pykafka:偏移管理器发现期间套接字断开连接

这是我的代码: 在此处输入图片描述

​当我运行它时,它会警告我: Socket disconnected during offset manager discovery So I use Ctrl+C to stop it then enter image description here

我需要你的帮助,我该怎么办?非常感谢!

0 投票
1 回答
1304 浏览

python - 为什么pykafka的生产者这么慢?

我使用 pykafka 编写了一个简单的生产者,但似乎无法让它执行。基本生产者和生产调用如下。当我用一条小消息调用它 100 次并添加一些计时/分析代码时,大约需要 14 秒。我知道这是异步发送消息,所以我希望它非常快。我缺少一些设置吗?我也用 min_queued_messages=1 尝试过,这需要大约 2 秒的时间。

我确实在 pycharm 中对此进行了分析,并且说 78.8% 的时间都花在了“time.sleep”上?!为什么会睡着?