问题标签 [pykafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1179 浏览

python - KafkaUtils.createDirectStream时TypeError:unhashable type:'TopicAndPartition'的原因是什么?

我想使用任意偏移量的 kafka 消息KafkaUtils.createDirectStream

我的源代码:

但得到如下错误:

pyspark 源代码:

fromOffsets 应该是一个字典,字典的键应该是一个TopicAndPartition对象。

有什么想法吗?

0 投票
1 回答
6524 浏览

python - 如何使用 uwsgi 和 gevent 使 kafka-python 或 pykafka 作为异步生产者工作?

我的堆栈是带有 gevents 的 uwsgi。我正在尝试用装饰器包装我的 api 端点,以将所有请求数据(url、方法、正文和响应)推送到 kafka 主题,但它不起作用。我的理论是因为我正在使用 gevents,并且我试图在异步模式下运行这些,实际上推送到 kafka 的异步线程无法使用 gevents 运行。如果我尝试使方法同步,那么它也不起作用,它在生产工人中死去,即在生产之后调用永远不会返回。虽然这两种方法在 python shell 上运行良好,如果我在线程上运行 uwsgi。

遵循示例代码: 1. 使用 kafka-python (async)

  1. 使用 py-kafka(同步):

    /li>
0 投票
3 回答
540 浏览

pykafka - 为什么在启动一些消费者时出现错误 PartitionOwnedError 和 ConsumerStoppedException

我使用 pykafka 从 kafka 主题中获取消息,然后进行一些处理并更新到 mongodb。由于pymongodb每次只能更新一项,所以我启动了100个进程。但是在启动时,一些进程出现错误“PartitionOwnedError and ConsumerStoppedException”。我不知道为什么。谢谢你。

>

>

0 投票
0 回答
860 浏览

apache-kafka - Producer 无法通过 DNS 连接到 broker

我有一个物理服务器,在那里我将advertised.host.name 设置为服务器ip,并在路由器上进行端口转发。但是生产者无法使用 dns 连接到代理。

错误:pykafka.connection:无法连接到 192.168.1.3:9092 警告:pykafka.producer:Broker 192.168.1.3:9092 已断开连接。重试。

0 投票
1 回答
1802 浏览

git - Publisher 如何将消息发布到 Apache Kafka 中的主题?

我是 Apache Kafka 的新手。我不了解 Apache Kafka 中主题和分区的剖析以及 Producer 将数据推送到分区的方式。

考虑一个场景,我有两个生产者 PR1、PR2 和三个代理 B1、B2、B3。一个主题 T1 与三个分区为 P1、P2、P3 拆分为三个代理。现在第一个生产者 PR1 与 Zookeeper 协调并找到 Broker 并推送消息。(比如日志服务器以每秒 1 条记录的速度推送其日志数据)到 T1 - P1 并将偏移量设置为 0。我怀疑第二条记录如何被推。它会推送到分区 P2 还是 P3 ?或者第一条记录本身是并行推送到所有三个分区。

现在第二个发布者加入并发布消息到分区。消息在哪里推送,它会推送到 P1 吗?如果是这种情况,PR1 已经将消息推送到 P1,PR1 和 PR2 是否会同时将消息背靠背附加到 P1,从而创建偏移量 0、1、2、3、4、5....?

0 投票
1 回答
915 浏览

python - 使用 pykafka 在 kafka 主题创建上创建多个分区

我正在使用以下代码使用 pykafka python 库 api 创建一个 kafka 主题。

这里的主题是默认创建的,只有 1 个分区。如何使用 pykafka 为主题分配多个分区,例如

topic.number_of_partitons=3?

0 投票
2 回答
889 浏览

apache-kafka - 我可以让多个消费者同时阅读同一个 Kafka 主题吗?

目前我有一个 Kafka 主题。

现在我需要运行多个消费者,以便可以并行读取和处理消息。

这可能吗。

我正在使用 python 和 pykafka 库。

在两个消费者中都接受相同的消息。我只需要处理一次消息。

0 投票
1 回答
1455 浏览

apache-kafka - PyKafka 无法连接到分区的 kafka 主题

操作

现在我在两个不同的 tmux 面板中运行以下 python 代码:

第一次运行 -在不同的面板或窗口或终端中第二次运行没问题

我得到以下回溯:

蟒蛇消费者.py

我正在使用 Fedora 25 / Python 3.5.2

0 投票
0 回答
207 浏览

python-3.x - 如何知道kafka分区的分配情况

我在scrapy中使用带有pykafka的kafka 0.9.0作为url队列,并且我使用的主题有30个分区和4个消费者。

一开始它可以正常工作,但是当它工作了大约几个小时时,kafka 似乎在重新平衡分区时遇到了问题,我得到了一些信息,例如

无法获取分区 < pykafka.partition.Partition at 0x7f0b422cbfd0 (id=23) >。重试

比它引发两个例外

pykafka.exceptions.ConsumerStoppedException

pykafka.exceptions.PartitionOwnedError

所以想知道kafka分区的分配情况。我使用kafka-consumer-groups.sh 之类的

./kafka-consumer-groups.sh --zookeeper host:port --group name --describe

在kafka的文档中提到,但我得到了一个空列表。

0 投票
2 回答
182 浏览

python - pykafka 主题返回无

主题的结果是一个所有值都为无的字典:

如何解决?