问题标签 [kafka-consumer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 将消息从 Kafka 拉到两个目的地
我能够获取 kafka 消息并将其插入 hdfs。我希望能够使用 BI 工具提取同一组消息。
有没有办法做到这一点?我需要创建 2 个消费者吗?还是2个消费群体?
好心提醒。
谢谢
apache-kafka - Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息
我们的用例是从 kafka 中删除过时/未使用的主题,即如果一个主题(在所有分区上)在过去 7 天内没有任何新消息,那么我们会将其视为过时/未使用并删除它。
许多谷歌结果建议在消息中添加时间戳,然后对其进行解析。对于灵魂可以工作的新主题和消息,但我们现有的主题和消息中没有任何时间戳。
我怎样才能得到这个工作?
java - 简单的 Kafka 消费者示例不起作用
我有一个简单的类来使用来自 kafka 服务器的消息。大部分代码抄自org.apache.kafka.clients.consumer.KafkaConsumer.java的注释。
我正在使用“org.apache.kafka:kafka-clients:0.8.2.0”。它抛出异常
我应该如何配置 key.deserializer?
java - Kafka - 使用高级消费者的延迟队列实现
想要使用高级消费者 api 实现延迟消费者
大意:
- 按键生成消息(每个消息都包含创建时间戳)这确保每个分区都按生成时间排序消息。
- auto.commit.enable=false(将在每个消息处理后显式提交)
- 消费一条消息
- 检查消息时间戳并检查是否已经过了足够的时间
- 处理消息(此操作永远不会失败)
提交 1 偏移量
/li>
关于这个实现的一些担忧:
- 提交每个偏移量可能会减慢 ZK
- consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
- 等待很长时间而不提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?)
- ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)
- 我还缺少其他问题吗?
谢谢!
python - 如何在程序中停止 Python Kafka Consumer?
我正在做 Python Kafka 消费者(尝试在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer )。当我运行以下代码时,它会一直运行,即使所有消息都已消耗。我希望消费者在消费完所有消息后会停止。怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Consumer 中)。
更新
我使用信号处理程序来调用 consumer.stop()。一些错误信息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过client.close()。但同样的结果。
我需要一些方法来优雅地停止 for 循环。
欢迎任何帮助。谢谢。
python - 如何使用 kafka.consumer.SimpleConsumer,seek()
API 文档在这里: http: //kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html
但是当我运行以下代码时,异常是%d 格式:需要一个数字,而不是 NoneType
当我使用以下代码时,异常是seek() got an unexpected keyword argument 'partition'
任何想法?谢谢。
apache-kafka - kafka.consumer.simple.SimpleConsumer.offsets 和 kafka.consumer.simple.SimpleConsumer.fetch_offsets 有什么区别
API 文档在这里: http: //kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html
kafka.consumer.simple.SimpleConsumer.offsets 和 kafka.consumer.simple.SimpleConsumer.fetch_offsets 有什么区别(https://github.com/mumrah/kafka-python/blob/adbd4ac052e4a5b40cfc2a3589b7adbcb656afe5/kafka/consumer/simple.py )?
如何获取一个主题的某个分区的所有消息的偏移量?如何获取未消费消息的偏移量?如何获取已消费消息的偏移量?似乎 offsets 和 fetch_offsets 都是消费消息的偏移量。
apache-spark - 如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取
我Kafka 0.8.2
用于从 AdExchange 接收数据,然后Spark Streaming 1.4.1
将数据存储到MongoDB
.
我的问题是当我重新启动我的Spark Streaming
工作时,例如更新新版本、修复错误、添加新功能。它将继续读取当时最新offset
的kafka
数据,然后在重新启动作业期间我将丢失 AdX 推送到 kafka 的数据。
我尝试了类似的auto.offset.reset -> smallest
方法,但它会从 0 -> last 然后数据很大并且在 db 中重复。
我也尝试设置特定的group.id
和consumer.id
,Spark
但它是一样的。
如何将offset
消耗的最新火花保存到zookeeper
或者kafka
然后可以从该火花读取到最新offset
?
apache-kafka - kafka 主题分区再平衡通知
我正在使用 Kafka 0.8.1.1
是否有任何 API(回调等)可以用来找到消费者lost partitions
或消费者?newly added partitions
apache-kafka - 如何使用 Unix 时间戳通过 SimpleConsumer API 获取偏移量?
我正在尝试使用SimpleConsumer 示例。
我修改代码中的偏移量:
当我使用kafka.api.OffsetRequest.EarliestTime()
or时效果很好kafka.api.OffsetRequest.LatestTime()
。但是当我将它设置为 UNIX TIMESTAMP 时,它此时不会返回消息。
例如
我将时间戳设置为 1439196000000L,即 2015/8/10 16:40:0。但是,它会在该时间前一小时左右返回一条消息。
- 这是分配时间戳的正确方法吗?时间戳应该是 13 位,而不是 10 位,对吧?
- 我在中国,使用北京时间。有关系吗?
- Kafka是否有可能有任何参数来设置集群的时间?