问题标签 [kafka-topic]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
187 浏览

apache-kafka - 使用 kafka 流的 Topic1 到 Topic2

我是 kafka 流的新手,我想阅读一个主题并使用 kafka 流 api 在一个新主题中写入其中的一部分。我的键是字符串,值是 Avro 是否有我可以使用的文档/示例?

编辑 :

在 SUB_TOPIC 我有:

键:{“ID”:“145”} 时间戳:2019 年 3 月 14 日 17:52:23.43 偏移量:12 分区:0

我的输入主题:

{“ID”:“145”,“TIMESTAMP”:1552585938545,“WEEK”:“\u0000”,“SOURCE”:{“string”:“TMP”},“BODY”:{“string”:“{\ "operation_type\":\"INSERT\",\"old\":{\"ROW_ID\":null,\"LAST_UPD\":null,\"DENOMINATION\":null,\"SIREN_SIRET\":null} ,\"new\":{\"ROW_ID\":\"170309-********\",\"LAST_UPD\":\"2019-03-14T17:52:18\",\ "DENOMINATION\":\"1-******\",\"SIREN_SIRET\":null}}" }, "TYPE_ACTION": { "string": "INSERT" } }

如何在新主题中添加 Body 中的其他字段?例子 :

{“ID”:“145”,“TIMESTAMP”:1552585938545,“WEEK”:“\u0000”,“SOURCE”:{“string”:“TMP”},“BODY”:{“string”:“{\ "operation_type\":\"INSERT\",\"old\":{\"ROW_ID\":null,\"LAST_UPD\":null},\"new\":{\"ROW_ID\":\" 170309-********\",\"LAST_UPD\":\"2019-03-14T17:52:18\"}}" }, "TYPE_ACTION": { "string": "INSERT" } }

0 投票
1 回答
20725 浏览

apache-kafka - 为什么kafka不创建主题?bootstrap-server 不是公认的选项

我是 Kafka 的新手,并试图在我的本地机器上创建一个新主题。

我正在关注这个链接

以下是我遵循的步骤:

  1. 启动动物园管理员

    /li>
  2. 启动 kafka-server

    /li>
  3. 创建主题

    /li>

但是在创建主题时,我收到以下错误:

创建主题是否需要任何其他配置?我在做什么错

0 投票
1 回答
659 浏览

apache-kafka - io.confluent.ksql.exception.KafkaTopicExistsException: when launching ksql-server-start ksql-server.properties

I'm working with ksql from quite some time. Kafka cluster if of 3 nodes. I've been using udf as well and all looks good until I stop the servers and start them again. On server start I'm seeing the following in the logs:

Though I've stopped/terminated all the queries, the log prints all the commands I've executed from the beginning for my testing till data, including create, select, drop. I've pulled out the .jar(UDF) from /ext folder and the server started, though the log prints udf function(i'm using) not available.

This is my ksql-server.properties:

Going nuts with the error. I'm deleting the topic and somehow its recreated. Someone please help.

0 投票
1 回答
2569 浏览

python - 如何使用 kafka-python 以编程方式创建主题?

我刚开始使用 Kafka,对 Python 还很陌生。我正在使用这个名为的库kafka-python与我的 Kafka 代理进行通信。现在我需要从我的代码中动态创建一个主题,从我看到的文档中我可以调用create_topics()方法来做到这一点,但是我不确定如何获得这个类的实例。我无法从文档中理解这一点。

有人可以帮我弄这个吗?

0 投票
1 回答
145 浏览

apache-spark - 无法使用火花流从 kafka 主题中读取数据

我正在尝试使用火花流从 kafka 主题中读取数据。我能够将消息生成到 kafka 主题中,但是在使用 spark 流从主题中读取数据时,我收到如下错误消息:

下面是代码:

0 投票
2 回答
178 浏览

apache-kafka - 高扩展性问题:如何跨多个微服务同步数据

我有以下用例:

  1. 假设您有两个处理事件 U 的微服务 AccountManagement 和 ActivityReporting。
  2. 当用户注册时,包含用户信息的事件U会发布到一个broker中,供两个微服务处理。
  3. 出于性能和可扩展性的原因,AccountManagement 和 ActivityReporting 微服务在两个实例中复制
  4. 每个微服务实例都有一个监听代理主题的消费者。主题的选择是让AccountManagement 和ActivityReporting 都可以同时处理U。

但是,我只希望 AccountManagement 的一个实例来处理事件 U,而 ActivityReporting 的一个实例来处理事件 U。

请分享您实施“每个应用程序组消费一次”代理系统的经验。

因为这将有效地解决这个问题。

0 投票
1 回答
130 浏览

apache-kafka - Kafka 消息没有被清除

我是卡夫卡的新手。我正在做一些关于如何清除 kafka 主题中的消息的实验。我发现,如果我们将主题的“retention.ms”属性设置为一些较小的时间值,比如 1 秒,那么根据我的理解,1 秒后主题中的消息将被清除。

我运行了 1 个生产者,它产生了很少的主题消息,并在一段时间后停止了它。同时我运行了一个控制台消费者,所以它得到了生成的消息。在保留时间过去后,我为同一主题启动了另一个消费者控制台,可以说是 1-2 分钟后。但令我惊讶的是,我能够收到有关该主题的消息。当我最终在主题中没有看到任何消息时,再次在 2 分钟后启动控制台使用者。kafka 花了将近 3-4 分钟来清除消息。Kafka 是否需要任何其他设置才能立即清除消息?

0 投票
0 回答
407 浏览

apache-kafka - NestedError:主题/分区更改检查失败

突然,我在运行 apache kafka 消费者组的过程中遇到了问题。任何人都请分享您对为什么会发生此问题的想法。

任何帮助都是不言而喻的。

0 投票
1 回答
571 浏览

apache-kafka - 如何在 Flink 程序中逐行读取 Kafka Topic

首先,我在 Kafka 主题中加载了一个 CSV 文件,我可以通过 Flink 程序打印该主题。代码如下:

我的问题是我想逐行阅读主题并分别处理每一行,请指导我如何逐行阅读 Kafka 主题?

任何帮助将非常感激。

0 投票
1 回答
1199 浏览

apache-kafka - 如何通过 docker-compose 文件为 kafka 挂载卷?

我正在使用带有最新标签的“wurstmeister/kafka”泊坞窗图像。每当我尝试停止和启动 kafka 容器时,它都会以默认配置启动容器。如何挂载卷,以便即使在容器停止或自动重新启动时数据仍然存在。

所有数据都保存在卷中提供的文件夹内的日志文件中,但是当容器重新启动时,它不会从该文件夹加载数据并开始新的副本。

我试过以下:

当容器重新启动时,所有主题都应该保持原样并具有先前创建的相同分区。

任何帮助都是不言而喻的。