问题标签 [apache-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
6752 浏览

apache-kafka - 如何获得 Kafka 的确认

一旦消息被消费或处理,我如何准确地从 Kafka 获得确认。听起来可能很愚蠢,但是有没有办法知道已收到 ack 的消息的开始和结束偏移量?

0 投票
2 回答
5855 浏览

apache-kafka - Apache Kafka 消费者组和简单消费者

我是 Kafka 的新手,到目前为止,我对消费者的理解是基本上有两种类型的实现。
1)高级消费者/消费者群体
2)简单消费者

关于高级抽象最重要的部分是当 Kafka 不关心处理偏移量时使用它,而简单消费者对偏移量管理提供更好的控制。让我感到困惑的是,如果我想在多线程环境中运行消费者并且还想控制偏移量。如果我使用消费者组,这是否意味着我必须从存储在 zookeeper 中的最后一个偏移量中读取?这是我唯一的选择。

0 投票
2 回答
1599 浏览

mongodb - 水槽或 kafka 相当于 mongodb

在 Hadoop 世界中,flume 或 kafka 用于流式传输或收集数据并将它们存储在 Hadoop 中。我只是想知道 Mango DB 是否有一些类似的机制或工具来实现这些?

0 投票
1 回答
1628 浏览

apache-kafka - 为什么在服务器启动时服务器失败并显示“java.net.SocketException:无效参数”?

卡夫卡0.8

我遵循快速入门指南,当我进入第 2 步运行时,我遇到bin/kafka-server-start.sh config/server.properties了异常:

我可能做错了什么?请指教。

0 投票
1 回答
5515 浏览

java - Kafka - 生产者 - 处理“发送失败”

我正在运行 0.8 Kafka,并使用提供的 Java API 构建生产者。
发送消息(或消息)的 API 函数返回 void。

有没有办法获取已发送消息的状态?如果它发送或失败?

这对我们来说非常重要,因为我们正在从文件中读取消息,并且我们希望在发送完所有消息后删除该文件。但是如果出现错误并且没有发送一些消息并且我删除了文件,它将导致非常重要的数据丢失。

0 投票
1 回答
1620 浏览

hadoop - 为什么 Flume 需要有 AMQP 源?

Flume 有几个第三方插件来支持 AMQP 源。为什么我们要向 rabbitmq 或 qpid 发送消息,然后发送到 flume 而不是直接发送到 flume ?我错过了什么吗?

另外,在什么情况下我应该使用 Qpid、rabbitMQ 等消息队列以及 Flume 等消息队列?我读了 Qpid ,RabbitMQ 保证订购交付,这对我来说并不重要。还有其他区别吗?

我们可以动态添加通道和接收器到正在运行的水槽代理吗?使用文件 roll sink 向源添加新通道,它不需要任何代码更改,只需更改 conf 文件并重新启动。有没有办法动态地做到这一点,即无需重新启动水槽代理

0 投票
3 回答
35803 浏览

apache-kafka - KafKa 分区器类,使用键将消息分配给主题内的分区

我是kafka的新手,如果我听起来很愚蠢,请道歉,但到目前为止我的理解是..消息流可以定义为一个主题,就像一个类别。并且每个主题都被划分为一个或多个分区(每个分区可以有多个副本)。所以他们并行行动

他们说从卡夫卡主站点

生产者可以选择将哪条消息分配给主题中的哪个分区。这可以简单地以循环方式完成以平衡负载,也可以根据某些语义分区函数(例如基于消息中的某个键)来完成。

这是否意味着在消费时我将能够从特定分区中选择消息偏移量?在运行多个分区时,是否可以从一个特定分区(即分区 0)中进行选择?

在 Kafka 0.7 quick start他们说

发送带有分区键的消息。具有相同键的消息被发送到同一个分区。

并且可以在创建生产者时提供密钥,如下所示

现在我如何根据这个键消费消息?在 Kafka 中生产时使用此密钥的实际影响是什么?

在 0.8beta 中创建生产者时,我们可以通过配置文件提供分区器类属性。可以创建自定义分区器类来实现 kafka 分区器接口。但我有点困惑它究竟是如何工作的。0.8 doc也没有过多解释。有什么建议或遗漏什么吗?

0 投票
2 回答
10578 浏览

apache-kafka - Kafka 0.8,是否可以使用 java 代码创建具有分区和复制的主题?

在 Kafka 0.8beta 中,可以使用如下命令创建主题,如下所述

上面的命令将创建一个名为“test”的主题,每个分区有 3 个分区和 2 个副本。

我可以使用 Java 做同样的事情吗?

到目前为止,我发现使用 Java 我们可以创建一个生产者,如下所示

这将创建一个名为“mytopic”的主题,其分区数使用“num.partitions”属性指定并开始生成。

但是有没有办法定义分区和复制呢?我找不到任何这样的例子。如果我们不能,那是否意味着我们总是需要在之前创建带有分区和复制的主题(根据我们的要求),然后使用生产者在该主题中生成消息。例如,如果我想以相同的方式创建“mytopic”但分区数量不同(覆盖 num.partitions 属性),是否有可能?

0 投票
1 回答
11150 浏览

java - Storm-Kafka 多个 spout,如何分担负载?

我正在尝试在多个喷口之间共享任务。我有一种情况,我一次从外部源获取一个元组/消息,并且我想要一个 spout 的多个实例,背后的主要目的是分担负载并提高性能效率。

我可以对一个 Spout 本身做同样的事情,但我想在多个 Spout 之间共享负载。我无法获得分散负载的逻辑。因为在特定的 spout 完成消费部分(即基于缓冲区大小设置)之前,消息的偏移量是未知的。

任何人都可以对如何解决逻辑/算法提出一些亮点吗?

提前感谢您的时间。


响应答案更新:
现在在 Kafka 上使用了多分区(即5
以下是使用的代码:
builder.setSpout("spout", new KafkaSpout(cfg), 5);

800 MB通过在每个分区上充斥数据进行测试,并~22 sec完成读取。

再次,使用parallelism_hint = 1的代码,
builder.setSpout("spout", new KafkaSpout(cfg), 1);

现在需要更多~23 sec!为什么?

根据 Storm Docs setSpout() 声明如下:

其中,
parallelism_hint - 应该分配给执行这个 spout 的任务数。每个任务将在集群周围某处的进程中的线程上运行。

0 投票
0 回答
765 浏览

hadoop - Kafka/flume读取事务日志——实时备份RDBMS

有没有人尝试使用 Kafka 或 Flume 来跟踪/读取 RDBMS 的事务日志并将数据重写到 Hadoop(例如 HBase)?因此,您可以从您的 RDBMS 中保持近乎实时的备份。

求一些经验分享。