问题标签 [kafka-rest]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
432 浏览

apache-kafka - Kafka rest api 获取消费者组的详细信息

我正在寻找一个kafka rest api,它将列出与

kafka-consumer-groups.sh - - 描述

会返回,基本上我正在尝试获取每个分区的偏移量及其特定消费者组的滞后的详细信息

0 投票
2 回答
2132 浏览

maven - 在本地找不到工件 io.confluent:kafka-rest-parent:pom:5.4.0-SNAPSHOT 和“parent.relativePath”点

我正在使用 maven 3 运行应用程序,但出现以下错误:

这里分享 pom .xml 父元素

http://maven.apache.org/maven-v4_0_0.xsd">

0 投票
2 回答
835 浏览

ssl - Confluent REST 代理 API SSL 握手失败

我在 docker 上有一个使用融合图像的 kafka 集群。我正在使用 docker-compose 来构建容器。

当我尝试运行容器时,它启动但由于 SSL 握手失败而无法与任何代理通信。我不知道我是否错过了一些配置

[kafka-admin-client-thread | adminclient-1] 错误 org.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] 连接到节点 -3 (/XXX:19092) 身份验证失败,原因是:SSL 握手失败

我的 Kafka 代理配置如下:

卡夫卡1:

我正在尝试使用以下配置将 Confluent REST 代理 API 引入另一个容器:

卡夫卡休息代理:

0 投票
0 回答
622 浏览

java - 为什么 Java Kafka Consumer 返回空记录列表?

我有应用程序,它实现了来自 Confluent REST Proxy for Kafka 的GET /topics/(string: topic_name)/partitions/(int: partition_id)/messages?offset=(int)[&count=(int)] 之类的 API。

所以,我有一群消费者。我的 API 处理程序非常简单:

  1. 从池中获取消费者
  2. consumer.assign(util.Arrays.asList(partition))
  3. consumer.seek(partition, startOffset)
  4. consumer.poll(Duration.ofMillis(300L))
  5. consumer.unsubscribe()
  6. 将消费者返回池

所以,我的解决方案在几天内效果很好。然后,发生了一些事情,并且poll()总是返回空的记录列表。

我可以解决此问题以重新启动我的应用程序。另外,我可以启动新的应用程序实例,这个实例可以从 Kakfa 读取记录,所以 Kafka 是活着的。

0 投票
1 回答
1111 浏览

apache-kafka - 如何在融合的 kafka 休息代理中访问主题的最新偏移量以计算滞后

在 confluent kafka rest proxy 中,我们可以获得特定消费者组的最后提交偏移量,但我们如何获得主题的最新偏移量来计算延迟。

0 投票
2 回答
2188 浏览

apache-kafka - 为什么在 Kafka REST 代理中进行 base64 编码/解码?

Producer 将消息序列化并以字节数组的形式发送给 Broker。消费者反序列化这些字节数组。Broker 总是存储和传递字节数组。我是这样理解的。

但是当您在Kafka中使用REST 代理时,生产者使用 base64 对消息进行编码,而消费者对这些 base64 消息进行解码。

生产者和消费者的 Python 示例:

为什么你用base64而不是字节数组向Broker发送消息?使用 REST 代理时,Broker 以 base64 格式存储消息?

0 投票
3 回答
7483 浏览

docker - 无法反序列化主题的数据

我从这里使用融合的 cp-all-in-one 项目配置:https ://github.com/confluentinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one /docker-compose.yml

我正在http://localhost:8082/topics/zuum-positions 使用以下 AVRO 正文发布一条消息:

我已将以下标头正确添加到上述 POST 请求中: Content-Type: application/vnd.kafka.avro.v2+json Accept: application/vnd.kafka.v2+json

执行此请求时,我在 docker 日志中看到以下异常:

我花了几个小时在这上面,找不到原因。通常,当connect无法连接到架构注册表时会发生此错误,但我已从此处保留其配置:https ://github.com/confluentinc/cp-docker-images/blob/5.2.2-post/examples/cp -all-in-one/docker-compose.yml#L77

你能帮忙吗?

0 投票
1 回答
381 浏览

windows - 适用于 Windows 的 Kafka 休息代理

Windows 有什么解决方案来设置和运行 Kafka 休息代理?所以我可以通过 IP/浏览器浏览和发送 API 获取/发布消息?

例如在windows 浏览器获取主题,并使用soapui 工具发送GET 消息进行查询?http://IP地址:8082/topics

我有 Kafka 服务器和 Zookeer 在 Windows 环境中运行。通过 cmd 创建主题和查看主题工作正常,但是在浏览 url http://localhost:8082/topics时,我得到空的服务器响应,我猜这是由于 kafka-rest 没有安装在 Windows 上。

0 投票
2 回答
1296 浏览

apache-kafka - 确保已使用 REST 代理从 Kafka 主题读取所有消息

我是 Kafka 的新手,我们的团队正在研究服务间通信的模式。

目标

我们有两个服务,P(生产者)和 C(消费者)。P 是 C 需要的一组数据的真实来源。当 C 启动时,它需要将所有当前数据从 P 加载到它的缓存中,然后订阅更改通知。(换句话说,我们希望在服务之间同步数据。)

数据总量比较少,变化不频繁。同步的短暂延迟是可以接受的(最终一致性)。

我们希望将服务解耦,这样 P 和 C 就不需要相互了解。

提案

当 P 启动时,它会将其所有数据发布到启用了日志压缩的 Kafka 主题。每条消息都是一个聚合,其 ID 为一个键。

当 C 启动时,它会从主题的开头读取所有消息并填充其缓存。然后它会继续从其偏移量中读取以收到更新通知。

当 P 更新其数据时,它会为已更改的聚合发布一条消息。(此消息与原始消息具有相同的架构。)

当 C 收到一条新消息时,它会更新其缓存中的相应数据。

在此处输入图像描述

约束

我们正在使用Confluent REST 代理与 Kafka 进行通信。

问题

当 C 启动时,它如何知道它何时从主题中读取了所有消息,以便它可以安全地开始处理?

如果 C 没有立即注意到 P 一秒钟前发送的消息,这是可以接受的。如果 C 在消费一小时前 P 发送的消息之前开始处理,这是不可接受的。请注意,我们不知道何时会更新 P 的数据。

我们不希望 C 在消费每条消息后必须等待 REST 代理的轮询间隔。

0 投票
2 回答
2904 浏览

apache-kafka - 如何更改 Kafka Rest Proxy CURL 命令以便在浏览器中使用它

我正在 Kafka 中试用 Rest Proxy。

当我在浏览器中输入以下网址时http://192.168.0.30:8082/topics

我得到了预期的结果:


我的问题:我尽量不使用 CURL。我有以下 CURL 命令示例。如果我只想使用上述浏览器,我该如何更改?

我试过这个,但是......(我怎样才能消费我的话题test?) 在此处输入图像描述

**只是文档中的一个示例:**