问题标签 [pykafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - KafkaUtils.createDirectStream时TypeError:unhashable type:'TopicAndPartition'的原因是什么?
我想使用任意偏移量的 kafka 消息KafkaUtils.createDirectStream
。
我的源代码:
但得到如下错误:
pyspark 源代码:
fromOffsets 应该是一个字典,字典的键应该是一个TopicAndPartition
对象。
有什么想法吗?
python - 如何使用 uwsgi 和 gevent 使 kafka-python 或 pykafka 作为异步生产者工作?
我的堆栈是带有 gevents 的 uwsgi。我正在尝试用装饰器包装我的 api 端点,以将所有请求数据(url、方法、正文和响应)推送到 kafka 主题,但它不起作用。我的理论是因为我正在使用 gevents,并且我试图在异步模式下运行这些,实际上推送到 kafka 的异步线程无法使用 gevents 运行。如果我尝试使方法同步,那么它也不起作用,它在生产工人中死去,即在生产之后调用永远不会返回。虽然这两种方法在 python shell 上运行良好,如果我在线程上运行 uwsgi。
遵循示例代码: 1. 使用 kafka-python (async)
使用 py-kafka(同步):
/li>
pykafka - 为什么在启动一些消费者时出现错误 PartitionOwnedError 和 ConsumerStoppedException
我使用 pykafka 从 kafka 主题中获取消息,然后进行一些处理并更新到 mongodb。由于pymongodb每次只能更新一项,所以我启动了100个进程。但是在启动时,一些进程出现错误“PartitionOwnedError and ConsumerStoppedException”。我不知道为什么。谢谢你。
>
>
apache-kafka - Producer 无法通过 DNS 连接到 broker
我有一个物理服务器,在那里我将advertised.host.name 设置为服务器ip,并在路由器上进行端口转发。但是生产者无法使用 dns 连接到代理。
错误:pykafka.connection:无法连接到 192.168.1.3:9092 警告:pykafka.producer:Broker 192.168.1.3:9092 已断开连接。重试。
git - Publisher 如何将消息发布到 Apache Kafka 中的主题?
我是 Apache Kafka 的新手。我不了解 Apache Kafka 中主题和分区的剖析以及 Producer 将数据推送到分区的方式。
考虑一个场景,我有两个生产者 PR1、PR2 和三个代理 B1、B2、B3。一个主题 T1 与三个分区为 P1、P2、P3 拆分为三个代理。现在第一个生产者 PR1 与 Zookeeper 协调并找到 Broker 并推送消息。(比如日志服务器以每秒 1 条记录的速度推送其日志数据)到 T1 - P1 并将偏移量设置为 0。我怀疑第二条记录如何被推。它会推送到分区 P2 还是 P3 ?或者第一条记录本身是并行推送到所有三个分区。
现在第二个发布者加入并发布消息到分区。消息在哪里推送,它会推送到 P1 吗?如果是这种情况,PR1 已经将消息推送到 P1,PR1 和 PR2 是否会同时将消息背靠背附加到 P1,从而创建偏移量 0、1、2、3、4、5....?
python - 使用 pykafka 在 kafka 主题创建上创建多个分区
我正在使用以下代码使用 pykafka python 库 api 创建一个 kafka 主题。
这里的主题是默认创建的,只有 1 个分区。如何使用 pykafka 为主题分配多个分区,例如
topic.number_of_partitons=3
?
apache-kafka - 我可以让多个消费者同时阅读同一个 Kafka 主题吗?
目前我有一个 Kafka 主题。
现在我需要运行多个消费者,以便可以并行读取和处理消息。
这可能吗。
我正在使用 python 和 pykafka 库。
在两个消费者中都接受相同的消息。我只需要处理一次消息。
apache-kafka - PyKafka 无法连接到分区的 kafka 主题
操作:
现在我在两个不同的 tmux 面板中运行以下 python 代码:
第一次运行 -在不同的面板或窗口或终端中第二次运行没问题
我得到以下回溯:
蟒蛇消费者.py
- 我的虚拟环境要求.txt
- 我的 kafka server.properties
- 我的动物园管理员zoo.cfg
我正在使用 Fedora 25 / Python 3.5.2
python-3.x - 如何知道kafka分区的分配情况
我在scrapy中使用带有pykafka的kafka 0.9.0作为url队列,并且我使用的主题有30个分区和4个消费者。
一开始它可以正常工作,但是当它工作了大约几个小时时,kafka 似乎在重新平衡分区时遇到了问题,我得到了一些信息,例如
无法获取分区 < pykafka.partition.Partition at 0x7f0b422cbfd0 (id=23) >。重试
比它引发两个例外
pykafka.exceptions.ConsumerStoppedException
pykafka.exceptions.PartitionOwnedError
所以想知道kafka分区的分配情况。我使用kafka-consumer-groups.sh 之类的
./kafka-consumer-groups.sh --zookeeper host:port --group name --describe
在kafka的文档中提到,但我得到了一个空列表。
python - pykafka 主题返回无
主题的结果是一个所有值都为无的字典:
如何解决?