问题标签 [confluent-cloud]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
830 浏览

java - 无法通过 KafkaIO 在光束中读取卡夫卡

我在 Apchea Beam 中编写了一个非常简单的管道,如下所示从我在 Confluent Cloud 上的 kafka 集群中读取数据,如下所示:

但是,当运行上面的代码从我的 kafka 集群中读取数据时,我得到了以下异常

我在直接 java runner 上运行,我使用的是 beam 2.8,

我可以读取并向我的 kafka 融合集群生成消息,但不能通过上述代码。

0 投票
1 回答
255 浏览

apache-kafka - Confluent Cloud 上的 Apache Kafka - 分区主题和消费者滞后中的不连贯偏移

在 Confluent Cloud 上使用 Kafka 时,我发现了一个奇怪的行为。我创建了一个具有默认分区值的主题:6。

我的系统由一个向该主题发送消息的 Java Producer 应用程序和一个从中读取并执行每个消息操作的 Kafka Streams 应用程序组成。

目前我只启动 Kafka Streams 应用程序的一个实例,因此消费者组有一个成员。

这是我观察到的:

  1. 生产者发送一条消息,并将其记录在偏移量为 0的事件主题中:

在此处输入图像描述

  1. 消息到达 KStream,正在正确处理,正如我在 KStream 日志跟踪中看到的那样:

韩流

日志

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {"...

  1. 在 Confluent Cloud 消费者滞后中,我可以看到所有消费者组及其状态。KStream 有一个名为events-processor-19549050-d8b0-4b39.... 如前所述,该组只有一个成员(KStream 的唯一实例)。但是,如果显示该组在分区 2 中的一条消息后面。此外,请注意当前偏移量似乎为 1,结束偏移量为 2):

在此处输入图像描述

  1. 如果我在生产者中发送另一条消息,它会再次记录在主题中,但这次使用偏移量 2 而不是 1

在此处输入图像描述

  1. 消息到达KStream,再次正常处理:

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {

  1. 回到消费者组的消费者滞后,它仍然落后一条消息,仍然有一些奇怪的偏移量(当前 3,结束 4):

在此处输入图像描述

尽管处理似乎很好,但上面显示的状态并没有多大意义。能否解释一下原因:

  1. 消息偏移量增加 +2 而不是 +1?
  2. 即使正确处理了消息,消费者组似乎也落后了 1 条消息?
0 投票
1 回答
2393 浏览

apache-kafka - Spring Cloud Stream Kafka Stream 应用程序显示在每次重新启动时将分区 event-x 的偏移量重置为偏移量 0

我有一个 Spring Cloud Stream Kafka Stream 应用程序,它从一个主题(事件)中读取并执行一个简单的处理:

此应用程序使用来自 Confluent Cloud 的 Kafka 环境,其事件主题具有 6 个分区。完整的配置是:

KStream 正在正确处理消息。如果我重新启动应用程序,它们不会被重新处理。注意:我不希望它们被重新处理,所以这种行为是可以的。

但是启动日志显示了一些奇怪的位:

  1. 首先,它显示了恢复消费者客户端的创建。自动偏移重置无:
  1. 然后它最早创建一个具有自动偏移重置的消费者客户端。
  1. 启动日志的最终跟踪显示偏移量重置为 0。每次重新启动应用程序时都会发生这种情况:
  1. 配置了两个消费者的原因是什么?

  2. auto.offset.reset = earliest当我没有明确配置它并且Kafka默认值是最新的时,为什么第二个有?

  3. 我想要默认的(auto.offset.reset = latest)行为,它似乎工作正常。但是,这与我在日志中看到的内容不矛盾吗?

更新:

我会这样改写第三个问题:为什么日志显示分区在每次重新启动时都被重置为 0,尽管如此,没有消息被重新传递到 KStream?

更新 2:

我已经简化了这个场景,这次使用的是原生 Kafka Streams 应用程序。该行为与使用 Spring Cloud Stream 观察到的行为完全相同。但是,检查消费者组和我发现的分区是有道理的。

流媒体:

这是我所看到的:

1)对于一个空主题,启动显示所有分区重置为偏移量0:

2)我在主题中放入一条消息并检查消费者组,看到记录在分区 4 中:

3)我重新启动应用程序。现在重置只影响空分区(0、1、2、3、5):

4)我插入另一条消息,检查消费者组状态,同样的事情发生了:记录在分区 2 中,当重新启动应用程序时,它只重置空分区(0、1、3、5):

0 投票
1 回答
202 浏览

apache-kafka-connect - 融合云是否允许自定义连接器部署

  1. Confluent 云仅支持以下连接器。

    • GCS 接收器连接器
    • S3 Sink Connector 请验证我的理解是否正确。
  2. 我们可以在融合云中部署自定义连接器或像 debezium 这样的 cdc。

0 投票
3 回答
1185 浏览

java - Confluent Cloud Apache Kafka Consumer - 主题 [test-1] 存在/不存在且 missingTopicsFatal 为真

我是一个新手,试图使用 Confluent Cloud Apache Kafka 在两个 Spring Boot 微服务之间进行通信。

在 Confluent Cloud 上使用 Kafka 时,在 ServiceA 将消息发布到主题后,我的消费者(ServiceB)上出现以下错误。但是,当我登录到我的 Confluent Cloud 时,我看到消息已成功发布到主题。

当我在本地服务器上运行 Kafka 时,我不会遇到这个问题。ServiceA 能够将消息发布到我本地 Kafka 服务器上的主题,并且 ServiceB 能够成功地使用该消息。

我在 application.properties 中提到了我的本地 Kafka 服务器配置(作为注释掉的代码)

服务 A:生产者

应用程序属性

发件人.java

KafkaProducerConfig.java

服务 B:消费者

应用程序属性

KafkaConsumerConfig.java

KafkaConsumer.java

0 投票
1 回答
324 浏览

apache-kafka - Confluent Cloud 中的消息值未正确解码

我对卡夫卡和融合很陌生。我用我自己的虚拟模型编写了一个与https://www.confluent.fr/blog/schema-registry-avro-in-spring-boot-application-tutorial/上的教程几乎相同的 Producer。application.yaml 也是一样的。当我将消息发送到 ccloud - 收到的消息是乱码在此处输入图像描述

关于如何解决这个问题的任何想法?当我System.out.println在发送到 kafka 之前执行 avro POJO 时,该对象在所有正确的值下看起来都不错。

而当我从 ccloud 下载消息时,value看起来像这样

0 投票
1 回答
253 浏览

apache-kafka - Confluent Cloud -> BigQuery - 如何诊断“不良记录”原因

我可以将数据从 MSSql Server 推送到 Confluent Cloud 上的主题,但不能从主题推送到 BigQuery,它会引发错误“过去一小时的错误记录 - 65”

我可以将主题连接到 bigQuery,但无法摄取数据。

MSSQL 和 BigQuery 表格式相同 first(string) last(string) raj ram

我是否需要添加任何其他列来提取时间戳、偏移量等数据?

0 投票
0 回答
108 浏览

apache-kafka - Confluent KSQL“为此主题注册流或表”不填充主题字段

我有 4 个主题能够成功地将数据流式传输到,并且它们的架构看起来不错。

但是在单击主题>“KSQL 中的查询”后,我注意到其中只有 1 个填充了“您希望包含在 STREAM 中的字段”。其他 3 个显示为空白,如下所示:

在此处输入图像描述

我注意到能够填充 KSQL 字段的主题有一个名为 int 的代理主键id,而那些不起作用的主题有一个名为boxIdeg的自然字符串主键"ABC123",如果这与它有什么关系?

我缺少什么来填充这些字段?

0 投票
1 回答
484 浏览

java - 从 Apache Beam(GCP 数据流)写入 ConfluentCloud

我正在尝试使用以下命令从 Dataflow(Apache Beam)写入 Confluent Cloud/Kafka:

在哪里Map<String, Object> props = new HashMap<>();(即现在为空)

在日志中,我得到:send failed : 'Topic testtopic not present in metadata after 60000 ms.'

该集群上确实存在该主题 - 所以我的猜测是登录存在问题,这是有道理的,因为我找不到传递 APIKey 的方法。

我确实尝试了各种组合来将我从 Confluent Cloud 获得的 APIKey/Secret 传递给props上面的身份验证,但我找不到有效的设置。

0 投票
0 回答
979 浏览

python - 使用 AIOKafka 客户端连接到 Confluent Cloud

我正在尝试使用https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py的repo 中示例Confluent Cloud的修改版本连接到我的 Kafka 集群。我已经用我认为正确的参数配置了我的和,但出现以下运行时错误:AIOKafka ssl_consume_produce.pyAIOKafkaAIOKafkaConsumerAIOKafkaProducer

我改编的代码版本是:

我的混淆配置conf如下所示:

是否可以Confluent Cloud使用 AIOKafka 客户端连接?我的配置有什么不正确的吗?