问题标签 [confluent-cloud]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
259 浏览

apache-kafka - 如何使用来自 Apache Kafka 的数据

参与挑战,它说:您的第一步 - 使用来自 Apache Kafka 的数据样本。 所以他们给了我主题名称API_KEYAPI_SECRET。哦,还有引导服务器。然后他们声称好像您不熟悉 Kafka,Confluent 提供了全面的文档。好吧,登录到 confluent,创建一个集群,然后.. 消费数据的下一步是什么?

0 投票
1 回答
53 浏览

apache-kafka - 在kafka res api响应中获取垃圾值

如果来自融合云上的 kafka 实例,我正在使用 rest api 来获取主题

我正在使用以下 curl 命令

但我得到了op.txt像垃圾一样的价值

有什么解决办法吗?

0 投票
0 回答
303 浏览

apache-kafka - 知识库 | 消费滞后 | 汇合云|

我在生态系统中使用 kafka 融合云作为消息队列。有 2 个主题,A 和 B。B
中的消息在 A 的消息发布后稍晚到达。(延迟 30 秒)

我正在使用 ksql 加入这两个主题,ksql 服务器部署在本地并连接到融合云。在 KSQL 中,我将这两个主题作为基于公共标识符的流加入,比如 requestId 并创建一个新的流 C。C 是加入的流。

有时,C steam 显示它已经产生了一个延迟,它没有处理 A 和 B 的消息。这个延迟在融合云 UI 中是可见的。当我登录到 ksql 服务器时,我可以看到以下错误,并且在重新启动 ksql 服务器后一切正常。这会在 2-3 天内间歇性发生。

这是我在本地部署的 ksql 服务器中的配置。

ksql 服务器日志中的错误消息。

编辑 :

  • 在此异常期间。我已经验证 ksql 服务器有足够的 RAM 和 CPU
0 投票
1 回答
537 浏览

mongodb - 使用 MongoDB Sink Connector 通过不同的主键更新现有文档

我正在尝试通过 Confluent Cloud 设置 MongoDB Sink 连接器,以保持 pgsql 和 MongoDB 之间的数据同步。

我期望下面的配置基于id(int) 字段(不是 _id - objectId)更新现有文档,但是它只是在使用时在 MongoDB 中创建一个新文档。来自 pg 的文档将不包含 _id 字段,因此我们需要在我们的 pgsql 主键 (id) 上完成查找。

任何想法为什么这不能像我预期的那样工作?

0 投票
0 回答
904 浏览

apache-kafka - 引起:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema

我有一个管道流,我将 debezium CDC mysql 连接器从 confluent 平台连接到 Confluent Cloud,因为云内置 debezium mysql 连接器处于预览状态,我已成功建立连接,并且来自主题的消息由 S3 sink 连接器订阅. 最初我有 json 格式的流,但后来我希望它是 AVRO 格式,因此我更改了键和值转换器的连接器配置文件,如下所示:

Debezium 连接器 json:

################################################# ##################

连接-distributed.properties:

################################################

-- 我通过 --> bin/connect-distributed etc/connect-distributed.properties 启动 kafka 连接

- 连接启动良好,但是当我尝试使用 curl 命令加载 debezium 连接器时,它显示以下错误“未经授权”,但我提供的 api 密钥和秘密是正确的,我也使用 cli 手动检查了它.

引起:org.apache.kafka.connect.errors.DataException: staging-development-rds-cluster at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:78) at org.apache.kafka.connect。 runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors .RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 更多原因:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":" SchemaChangeKey","namespace":"io.debezium.connector.mysql","fields":[{"name":"databaseName","type":"string"}],"connect.name":"io.debezium.connector.mysql.SchemaChangeKey"} 原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; 错误代码: io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235) 在 io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209) 的 401 .confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) 在 io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) 在 io.confluent.kafka .schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) 在 io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:119) 在 io。confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:156) 在 io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) 在 io.confluent.connect.avro.AvroConverter$Serializer。在 org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) 处序列化(AvroConverter.java:117) ) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) 在 org. apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:266) 在 org.apache .kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) 在 org.apache.kafka.connect.runtime.WorkerTask .run(WorkerTask.java:219) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util。 concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) [2020-11 -30 05:30:47,389] 错误 WorkerSourceTask{id=mysql_deb3-0} 任务被杀死,直到手动重新启动才能恢复(org.apache.kafka.connect.runtime.WorkerTask:178)[2020-11-30 05:30:47,389] 信息停止连接器 (io.debezium.connector.common.BaseSourceTask:187) [2020-11-30 05:30:47,389] 信息停止 MySQL 连接器任务 (io.debezium.connector.mysql.MySqlConnectorTask:458)

请大家帮我解决这个问题。提前致谢

0 投票
1 回答
507 浏览

apache-kafka - 如何关闭正在运行的 Confluent Cloud 集群?

在 Confluent ( https://confluent.cloud ) 上似乎没有关闭 Kafka 集群的选项:

在此处输入图像描述

我试图限制我的成本并关闭集群。我已经探索了上面屏幕截图中的各种选项,但我没有找到可以关闭集群的位置。

0 投票
0 回答
454 浏览

amazon-web-services - 将 sarama 消费者组连接到 AWS 上托管的融合云 kafka。没有 AMS

这是我第一次尝试连接到托管在 AWS 服务器上但不使用任何 Amazon Managed Streaming 服务的 Confluent Kafka 集群。

使用此代码(减去 Config.Net.SASL 设置),我能够毫无问题地连接到旧的自托管集群。

我无法弄清楚我做错了什么。

连接时,我收到此错误:

这是我的代码:

0 投票
1 回答
382 浏览

prometheus - 如何将 Confluent 云指标发送到 Datadog?

我想将 Confluent 云指标导入 Datadog,所以我遵循了这个指令。而不是使用 CCLOUD_USER: ${CCLOUD_USER} 和 CCLOUD_PASSWORD: ${CCLOUD_PASSWORD} 我使用 CCLOUD_API_KEY 和 CCLOUD_API_SECRET 作为导出容器的环境变量。

我收到无法建立新连接:[Errno 111] 连接被拒绝错误:

当我尝试 curl http://ccloudexporter_ccloud_exporter_1:2112/metrics 时,我没有得到回复,但我使用 curl 到 http://localhost:2112/metrics。所以我调整了 openmetrics.yml 以使用 prometheus url http://localhost:2112/metrics。DD 容器中的错误仍然相同。当我在浏览器中访问 http://localhost:2112/metrics 时,我看到了指标。

不知道为什么 DD 无法连接到 /metrics。

0 投票
2 回答
657 浏览

apache-kafka - org.apache.avro.AvroTypeException:未知的联合分支 EventId

我正在尝试使用“kafka-avro-console-producer”将 json 转换为 avro 并将其发布到 kafka 主题。

我可以做那个扁平的 json/schema,但是对于下面给定的 schema 和 json,我得到“org.apache.avro.AvroTypeException:Unknown union branch EventId”错误。

任何帮助,将不胜感激。

架构:

Json 输入如下:

在我的情况下,我不能改变模式,我只能改变 json 这样它的工作方式

0 投票
1 回答
319 浏览

apache-kafka - 使用 Apache Kafka 连接器 4.5.0 在 Mule 4 和 Confluent Cloud 之间终止连接,但使用 3.0.7 连接

使用 Mule 4 和 Confluent Cloud 设置(非常简单的)POC: 在此处输入图像描述

我无法使用最新版本的 Mule 4 Apache Kafka 连接器 (4.5.0) 建立成功连接。如果我将它降级到 3.0.7 并使用相同的配置,它就可以正常工作。为什么是这样?

工作的 3.0.7 配置(对于基本生产者)如下所示:

失败的 4.5.0 配置(也适用于基本生产者)如下所示:

你可以看到他们俩:

  • 使用 SASL 纯文本连接
  • 有HTTPS的SSL端点识别算法
  • 指定相同的引导服务器、API 密钥和机密

HTTP listener除了 an和 a之外,流程中几乎没有其他内容Set Payload

使用早期连接器版本发送的消息可以正常到达 Confluent Cloud 主题,但是使用应用程序无法启动并递归打印错误,例如:

和堆栈跟踪End of File Exception

其中(查看 Apache 源代码)看起来像一个零字节消息响应。