问题标签 [kafka-python]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - kafka-python - 如何提交分区?
使用 kafka-python-1.0.2。
如果我有一个包含 10 个分区的主题,我如何去提交一个特定的分区,同时遍历各种分区和消息。我似乎无法在任何地方找到这样的例子,在文档或其他地方
从文档中,我想使用:
consumer.commit(offset=offsets)
具体来说,如何创建偏移量所需的分区和 OffsetAndMetadata 字典(dict,可选) - {TopicPartition: OffsetAndMetadata}。
我希望函数调用会是这样的:
consumer.commit(partition, offset)
但这似乎并非如此。
提前致谢。
apache-kafka - Kafka:Kafka 是否提供对应用程序级状态转换的支持?
假设我有一个主题,有 6 个分区和 2 个消费者,其中 P1、P2、P3 由 C1 处理,P4、P5、P6 由 C2 处理。假设用户数据 U1 总是到 P1,U2 到 P2,依此类推。
所以,
现在让我们说我们添加了一个消费者 C3,所以重新平衡发生了,现在
所以我的应用程序在 C2 中维护用户 U6 状态,但现在 U6 数据正在流向 C3
现在在这里不知何故,来自 C2 的 U6 状态应该流向 C3。那么这是如何在 Kafka 中实现的,知道它非常常见的问题
或者
如果Kafka不提供任何支持,那么这个问题一般是如何解决的……有没有设计模式来解决它?
apache-kafka - Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?
如果我将 Producer 的 Kafka 配置参数设置为:
那么很可能一个分区中的消息可能不在 send_order 中。
Kafka 是否采取任何额外步骤来确保分区内的消息仅保持发送顺序,或者使用上述配置,分区内可能有乱序消息?
python - Kafka 基本设置创建主题和发送消息的错误消息
我正在尝试设置我的 Kafka,但我有一些关键问题。我什至无法创建主题或发送消息。我的 Kafka 版本号是:0.9.0.1 感谢您的帮助。
请在下面找到我的错误消息:
python - Python - 无模式 Apache Avro 数据序列化
我正在尝试使用 python 2.7 和 Apache Avro(python 客户端)通过 kafka 代理交换序列化消息。我想知道是否有一种方法可以在不创建架构的情况下交换消息。
这是代码(使用模式,sensor.avsc,我想避免的事情):
这是 sensor.avsc 文件:
hadoop - Apache Kafka 是否将消息内部存储在 HDFS 或其他文件系统中
我们有一个项目要求在 Kafka 层测试数据。因此 JSON 文件正在移动到 hadoop 区域,而 kafka 正在读取 hadoop(原始 Json 文件)中的实时数据。现在我要测试从其他系统发送的数据和kafka读取的数据是否应该相同。
我可以在 kafka 验证数据吗?kafka 是否将消息内部存储在 HDFS 上?如果是,那么它是否存储在类似于 hive 内部保存的文件结构中,就像单个表的单个文件夹一样。
python - kafka-python 引发 kafka.errors.ConsumerFetchSizeTooSmall
我正在 python 2.7 中编写一个简单的代码,它正在使用来自 apache kafka 主题的消息。代码如下:
但是提出了这个例外:
我如何修改此参数(ConsumerFetchSize)以使此代码正常工作?
python-2.7 - pyspark 无法找到 KafkaUtils.createDirectStream
我有以下 pyspark 脚本,假设连接到本地 kafka 集群:
当我运行它时,我收到以下错误:
我应该怎么做才能访问 KafkaUtils.createDirectStream ?
python-2.7 - 如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?
在我的 pyspark 应用程序中,我打算使用 Spark 流作为一种在“飞行中”转换 Kafka 消息的方法。每条这样的消息最初都是从特定的 Kafka 主题接收的。此类消息将需要进行一些转换(比如说 - 用一个字符串替换另一个字符串),并且转换后的版本需要发布在不同的 Kafka 主题上。第一部分(接收 Kafka 消息)似乎工作正常:
将某些东西(比如说 - 一个字符串)放到不同的 Kafka 主题上的正确语法是什么?这种方法应该由 KafkaUtils 提供,还是以其他方式提供?
python-2.7 - 如何正确使用 pyspark 向 kafka 代理发送数据?
我正在尝试编写一个简单的 pyspark 作业,它将从 kafka 代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的 kafka 代理主题上。
我有以下代码,它从 kafka 主题中读取数据,但对运行 sendkafka 函数没有影响:
为了让我的 sendkafka 函数真正将数据发送到 spark.out kafka 主题,我应该改变什么?