1

我正在尝试创建一个生产者 API,它应该只获取请求的主体并将其发送到 MSK 集群中的 kafka 主题。(消费者部分的问题几乎相同)

我在具有 3 个专用子网的 VPC 中创建了 MSK 集群。无服务器应用程序在同一 VPC 中运行,但另一个子网将 VpcConfig 添加到“serverless.template”文件中。我还在该文件上附加了策略“AWSLambdaFullAccess”和“AmazonMSKFullAccess”。双方的安全组都允许所有端口中的所有流量。

我在同一子网中创建了一个 EC2 实例,我可以使用命令“bin/kafka-console-producer.sh”成功连接并创建消息。

我的 dotnet 核心使用的是包“Confluent.Kafka”1.3.0 版。

代码如下:

            var config = new ProducerConfig { BootstrapServers = this.BootstrapServers };
            using (var p = new ProducerBuilder<Null, string>(config).Build())
            {
                try
                {
                    this.Logger.LogInformation($"Producer name: {p.Name}");

                    var dr = await p.ProduceAsync(this.TopicName, new Message<Null, string> { Value = message });

                    this.Logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                }
                catch (ProduceException<Null, string> e)
                {
                    var msg = $"Delivery failed: {e.Error.Reason}";
                    Console.WriteLine(msg);
                    this.Logger.LogCritical(msg);
                    throw new Exception(msg);
                }
            }

我正在尝试使用直接从 MSK 提供的 DNS 设置引导服务器,并添加前缀“plaintext://”

知道为什么我在“ProduceAsync”行中出现 TimeOut 吗?

我的猜测是这是因为库“Confluence.Kafka”在某种程度上与 MSK 不兼容,但我真的不知道这是否属实。

事先谢谢你。

亲切的问候,

4

0 回答 0