问题标签 [confluent-kafka-dotnet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
335 浏览

masstransit - 将 MassTransit 与 Kafka 和内存总线一起使用时,应用程序退出期间出现 AccessViolationException

我将 MassTransit 7.1.4 版与 .Net Core 3.1、ASP.NET Core Web 应用程序一起使用。当我通过Ctrl+关闭应用程序时会发生这种情况C。当我按下停止调试按钮时,它不会发生。负责注册的代码如下:

例外:

当我注释掉主题注册或使用 RabbitMq 作为总线时,它已经停止发生。我知道内存总线主要用于测试,但是现有的 Kafka 基础设施,所以运行另一个基础设施没有意义。

0 投票
1 回答
137 浏览

c# - 连接到本地 Kafka pod

我是 k8s 的新手,已使用 helm install 命令将 Kafka 安装到本地集群,并且已成功安装并使用

掌舵列表

并使用

Kubectl 获取所有 -A

命令作为运行。

我在我的 C# 项目中安装了 Confluent.Kafka 块包并尝试连接到 pod,但它没有使用 localhost:13090 连接并且没有给出错误消息

请注意,pod 的命名空间是“default”,而应用程序 pod 的命名空间是“my-pod”</p>

请指教,谢谢

0 投票
1 回答
531 浏览

c# - Confluent Kafka 为偏移位置返回 -1001

我正在尝试Kafka使用Confluent Kafka.

这是我用来获取它的代码:

不过,它总是给我一个 -1001 的值。我究竟做错了什么?

附加信息

我认为这可能是因为它是Unset。这就是文档所说的:

如果此消费者没有使用先前的消息,则取消设置。

我不确定我应该怎么做。

0 投票
0 回答
298 浏览

.net - 使用来自多分区主题(批处理)的新消息的 Kafka .NET 消费者

我创建了一个 kafka 消费者,它在单个分区主题的批处理上执行。每隔 15 分钟,我的批处理将执行并使用已发布到主题的所有新消息。在当时消费完所有可用消息后,批处理将退出。

以下是我如何实现此行为的示例代码:

一旦主题转到多分区,上述逻辑将不再起作用,因为到达一个分区的末尾将提前退出批处理应用程序。有没有一种方法可以遍历主题上的分区并在分区的基础上使用消息?

我在网上找到的大多数示例都是面向服务的 Kafka 消费者,当消息发布到主题时,它们会无限轮询和消费。不幸的是,我的情况有点独特,这要求我有一个明确的条件,以便在所有消息都被消耗完并退出批处理应用程序后停止读取新消息。任何帮助将不胜感激!

0 投票
0 回答
132 浏览

.net - 查看加密的 Kafka 消息中的标头

在我的 Confluent Kafka .NET 应用程序中,我有一个生产者,它使用拦截器在发送消息之前对其进行加密。我想知道是否可以将标头数据添加到可以在消费者解密消息之前读取的加密消息。加密由AwsCrypto.

为了检查这一点,我让我的消费者在 Consume() 之后输出以下日志条目:

这是我在日志中看到的:

消息头:Confluent.Kafka.Headers

如何查看实际的标头数据?

0 投票
0 回答
201 浏览

c# - Kafka c#:自定义分区器和分配器实现

我需要使用 c# Confluent 客户端库实现自定义 Partitioner 和 Assigner。使用 apache java 客户端 API 在 java 应用程序中实现了相同的功能:

  1. 隔板:

    • 生产者配置:

      props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyCustomerPartitioner.class.getName());

    • MyCustomerPartitioner 通过重写 configure、partition、onNewBatch 和 close 方法来实现 Partitioner。

  2. 分配者:

    • 消费者配置:

      props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, MyCustomAssignor.class.getName());

    • MyCustomAssignor 通过覆盖 name、configure、subscriptionUserData 和 assign 方法扩展 AbstractPartitionAssignor 实现 Configurable。

上述java实现使用spring-kafka 2.6.7。

我无法在 c# Confluent kafka 版本中找到 API:1.4.0。用于 Partitioner 和 Assigner 的自定义实现。

观察:

  1. 作为 ProducerConfig 一部分的 Partitioner 是枚举类型,具有 Random、Consistent、ConsistentRandom、Murmur2 和 Murmur2Random 值

  2. 分配器:ConsumerConfig 中使用的 PartitionAssignmentStrategy 是枚举值:Range 和 RoundRobin。

请求您让我知道我可以使用 c# Confluent kafka 库来实现自定义分区器和分配器的 API。

0 投票
0 回答
153 浏览

apache-kafka - 使用 kafka 端点提高事件中心的性能

我们正在使用融合的 Kafka 库 (1.5.1) 评估带有 Kafka 端点的 azure 事件中心,并且我们观察到高 RTT(平均延迟)、消费者获取周期时间,有时再平衡计数也会增加。这些指标使用统计处理程序记录。

以下是消费者配置:

注意:消费者获取周期时间较长的原因之一是消息提交 (_streamConsumer.Commit) 花费的时间比应用程序处理消息的时间要长。

相同的代码和配置适用于 Kafka 代理,但不适用于带有 Kafka 端点的事件中心。您能否分享您的建议以提高性能?

0 投票
0 回答
732 浏览

c# - Confluent-kafka-dotnet - 在互联网连接丢失几秒钟后,消费者在提交偏移量时挂起

描述

我正在使用这个 kafka 客户端 - https://github.com/confluentinc/confluent-kafka-dotnet 在互联网连接丢失几秒钟后提交偏移量时,消费者会挂起。

如何重现

我有正在运行的消费者 - 一切都很好。但是,如果我关闭 Wifi 并在我的笔记本电脑上再次打开(我本地机器上的消费者 - 但生产相同) - 它会消耗第一个消息 - 并挂在提交 - 结果 - 僵尸消费者线程。我尝试在具有超时的单独线程中运行 Commit - 在超时的情况下 - 关闭当前消费者 - 并重新创建它 - 但它挂在 Close() 方法上(相同 - 使用 Unassign 和 Unsubscribe 和 Dispose 方法)

唯一可行的方法-我在带有超时的单独线程中运行 Commit()-在超时的情况下-只需重新创建另一个使用者循环(线程)-无需调用 Close()、Unassign、Unsubscribe、Dispose 方法。但是每次我做这个技巧时线程的数量都会增加 - 可能是因为死锁的线程。

也许类似 - https://github.com/dpkp/kafka-python/issues/1728 而这个 - https://github.com/confluentinc/confluent-kafka-dotnet/issues/1552

[ 1.6.3(但 1.5.3 - 相同)] Confluent.Kafka nuget 版本。[Windows 10 家庭版] 操作系统。

消费者配置:

日志: 这里一切正常:

.............

[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:在偏移量 25 (v4) 处获取主题 vad-UpdateFrameworkFromImportCommand [1] [thrd:sasl_plaintext://94.131 .241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:在偏移量 0 (v4) [thrd:sasl_plaintext://94.131.241.251:9094/bootstrap] 处获取主题 vad-ImportFrameworksConsumerRetry [0]: sasl_plaintext://94.131.241.251:9094/2:获取 2/2/2 toppar(s)

Wifi 已关闭:

[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:断开连接(在 18793 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9094/bootstrap ]:sasl_plaintext://94.131.241.251:9094/2:获取回退 500 毫秒:本地:代理传输失败 [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/ 1:断开连接(在 18781 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/1:获取回退 500 毫秒:本地:代理传输失败 [thrd :sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3: 断开连接(在 24894 毫秒后处于 UP 状态)[thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:获取 500 毫秒的退避:本地:代理传输失败 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理关闭:重新查询 [thrd:main] :主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1] :代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:GroupCoordinator]:GroupCoordinator:94.131.241.251:9093:已断开连接(在 24050 毫秒后处于 UP 状态) [thrd:main]: Group "ImportFrameworksConsumer" 改变了状态 -> query-coord (join-state stable) [thrd:main]: Group "ImportFrameworksConsumer": 没有可用于协调器查询的代理:在状态查询坐标 sasl_plaintext://94.131.241.251:9094/2 中间隔:断开连接(在状态 UP 18793 毫秒后)Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9093/ 1:断开连接(在状态 UP 18781 毫秒后)Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9095/3:断开连接(在状态 UP 24894 毫秒后)Confluent.Kafka.Consumer2[ System.String,System.String] GroupCoordinator: 94.131.241.251:9093: 断开连接(在 24050 毫秒后处于 UP 状态) Confluent.Kafka.Consumer2[System.String,System.String] 4/4 代理关闭 Confluent.Kafka.Consumer2[ System.String,System.String] [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询-coord [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭: 重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [ thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd: main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态间隔查询-coord [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1] :代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询[thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在state query-coord [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main ]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1 ]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有代理可用于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd :main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer”:没有可用的代理对于协调器查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器的代理查询:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [ 1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询抛出异常:System.dll 中的“System.Net.Sockets.SocketException”抛出异常: System.dll 中的“System.ObjectDisposedException” 引发的异常:System.dll 中的“System.Net.WebException” 引发的异常:System.dll 中的“System.Net.WebException” 引发的异常:System.dll 中的“System.ObjectDisposedException”异常抛出:System.dll 中的“System.Net.WebException”异常抛出:Elasticsearch.Net 中的“Elasticsearch.Net.ElasticsearchClientException”。dll [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔[ thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd:main]:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标中间隔 [thrd: main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [ 2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main] :组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标[thrd:main]中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标[thrd:main]中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:主题 vad- ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [ thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [thrd:main]:组“ImportFrameworksConsumer “:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态查询坐标 [thrd:main] 中间隔:组“ImportFrameworksConsumer”:没有可用于协调器查询的代理:在状态间隔查询-坐标 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-ImportFrameworksConsumerRetry [2]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [0]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [1]:代理已关闭:重新查询 [thrd:main]:主题 vad-UpdateFrameworkFromImportCommand [2]:代理已关闭:重新查询 [ thrd:main]:消费者组会话在 10184 毫秒后超时(加入状态稳定),没有来自组协调器的成功响应(代理 1,最后一个错误是成功):撤销分配并重新加入组 [thrd:sasl_plaintext:// 94.131.241.251:9094/bootstrap]:sasl_plaintext://94.131.241.251:9094/2:连接到 ipv4#94.131.241.251:9094 失败:未知错误(在 CONNECT 状态下 21036 毫秒后)sasl_plaintext://94.131.241.251:9 /2:连接到 ipv4#94.131.241.251:9094 失败:未知错误(在 CONNECT 状态下 21036 毫秒后)Confluent.Kafka.Consumer2 [System.String,System.String] [thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:连接到 ipv4#94.131.241.251:9095 失败:未知错误(在状态 CONNECT 21040 毫秒后)[thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext: //94.131.241.251:9093/1:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态下 21041 毫秒后) sasl_plaintext://94.131.241.251:9095/3:连接到 ipv4#94.131.241.251: 9095 failed: Unknown error (after 21040ms in state CONNECT) Confluent.Kafka.Consumer2[System.String,System.String] sasl_plaintext://94.131.241.251:9093/1: Connect to ipv4#94.131.241.251:9093 failed: Unknown错误(在 CONNECT 状态下 21041 毫秒后)Confluent.Kafka。Consumer2 [System.String,System.String] [thrd:GroupCoordinator]:GroupCoordinator:94.131.241.251:9093:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态 21054 毫秒后)GroupCoordinator:94.131.241.251: 9093:连接到 ipv4#94.131.241.251:9093 失败:未知错误(在 CONNECT 状态 21054 毫秒后)Confluent.Kafka.Consumer`2[System.String,System.String]

Wifi 开启:

[thrd:sasl_plaintext://94.131.241.251:9095/bootstrap]: sasl_plaintext://94.131.241.251:9095/3:在偏移量 0 (v4) 处获取主题 vad-ImportFrameworksConsumerRetry [1] [thrd:sasl_plaintext://94.131 .241.251:9095/bootstrap]:sasl_plaintext://94.131.241.251:9095/3:在偏移量 18 (v4) [thrd:sasl_plaintext://94.131.241.251:9095/bootstrap] 处获取主题 vad-UpdateFrameworkFromImportCommand [2]: sasl_plaintext://94.131.241.251:9095/3:获取 2/2/2 toppar(s) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]:sasl_plaintext://94.131.241.251:9093/1:在偏移量 9 (v4) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap] 处获取主题 vad-UpdateFrameworkFromImportCommand [0]:sasl_plaintext://94.131.241.251:9093/1:获取主题 vad-ImportFrameworksConsumerRetry [2]在偏移量 0 (v4) [thrd:sasl_plaintext://94.131.241.251:9093/bootstrap]: sasl_plaintext://94.131.241.251:9093/1:获取 2/2/2 个顶杆

0 投票
0 回答
69 浏览

c# - 在 kafka 中手动提交偏移量

我有一个带有相应分区的偏移量列表,我需要手动提交它们。为此,我遍历列表并将分区分配给消费者,然后寻找特定的偏移量。然后我正在使用消息并将 ConsumerBulider 传递给 commit 方法。有时它执行顺利,但有时它会抛出“Local:Waiting for Coordinator”异常。但是在这两种情况下,当我之后尝试使用消息时,我会重新使用我已经提交的同一系列消息,或者我应该说我尝试过提交。这意味着我永远不会真正提交他们:(

消费者/客户端配置:

我正在使用 Confluent.Kafka 1.6.2 版本和 .net5

有人可以帮我吗?

0 投票
1 回答
333 浏览

azure - Azure 事件中心是否限制主题数量?

我正在尝试将 Azure 事件中心用作使用 Confluent Kafka sdk 的消息总线。当我开始测试我的代码时,它可以工作,但是在我创建了 10 个主题之后,我无法再创建任何新主题。我正在使用 IAdminClient.CreateTopicsAsync 创建主题。我可以使用 MetaData.Topics.ToList() 查看我创建的前 10 个主题仍然存在,只是无法创建任何新主题。我使用 IAdminClient.DeleteTopicsAsyn 删除了我的一个主题,之后我能够添加 1 个新主题,使总数恢复到 10。我仍然无法创建第 11 个。是否有某种限制可以让我创建 10 个主题,但不能创建第 11 个?这是我可以通过简单的配置更改来增加的东西吗?谢谢你。

CreateTopicsAsync 引发的错误:

CreateTopicsException:创建主题时出错:[TopicName11]:[服务无法处理请求;请重试操作。有关异常类型和正确异常处理的详细信息,请参阅 http://go.microsoft.com/fwlink/?LinkId=761101]。