问题标签 [confluent-kafka-dotnet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
347 浏览

c# - 有没有办法使用 Confluent.Kafka .Net 客户端查询主题的复制因子和保留时间?

Confluent.Kafka AdminClient 允许您创建一个主题,指定名称、分区数量、复制因子和保留(我猜其他设置通过 configs 属性)。然而,GetMetadata() 调用返回一个仅包含名称和分区信息的 TopicMetadata。有没有办法使用 .Net 客户端检索复制因子和保留时间?

0 投票
1 回答
308 浏览

apache-kafka - 如何使用 Kafka Confluent C# 客户端发送原始 Json?

我有一个消费者应用程序需要这样的 JSON {"timestamp":1554138000,"level":"first","message":"abc"}:. 但是 Confluent Message 对象似乎是字符串的键值对。我假设,消息 KV 对在内部序列化为 json。但是我怎样才能确保它不会为时间戳值加上双引号呢?有没有办法将原始 JSON 作为消息传递?

0 投票
2 回答
1645 浏览

c# - Kafka 消费者不使用来自现有主题的消息

我在 docker 上安装了 confluent kafka。在主题中,我有 10 个分区。问题是我无法使用来自该主题的消息,但我可以在该主题中生成消息。我正在尝试使用 C# confluent.kafka driver 1.5.1 (latest) 和 librd.kafka 1.5.0 (latest) 从主题中消费。

我启动kafka的docker-compose文件如下

我在 C# 中的使用者配置如下:

我确定主题分区中有消息,因为我可以使用 kafka 工具 2.0 检查主题 在此处输入图像描述

我用于 kafka 工具的配置是在此处输入图像描述在此处输入图像描述

我很确定我错过了配置文件中的某些内容,但是在阅读了 2 天的文档并将我的头撞到墙上后,我仍然找不到问题。那么有人可以帮忙吗?

0 投票
1 回答
294 浏览

c# - Confluent.Kafka - 主题日志压缩

我目前正在使用 Confluent.Kafka 构建发布者和消费者资产,并且我正在尝试了解我是否需要在代码中做任何不同的事情。我能够创建主题日志压缩,但我不完全了解如何在 C# .NET Core 中使用它。

我的主要问题是在创建启用了日志压缩的主题之后,是否必须在代码中完成任何操作才能使用它,或者是否全部在后台处理。

如果要编写特定于代码的方面,是否有人可以举个例子给我看?我已经研究了几天,发现了很多关于如何创建启用日志压缩的主题的信息(我已经实现了),但没有关于这可能如何影响生产者和消费者的代码使用的信息.

任何帮助将非常感激。

0 投票
0 回答
278 浏览

asp.net-core - Blazor Server - 如何通知用户会话使用和更新网页的每个 kafka 消息?

我是 Kafka 新手,正在使用 Kafka Dotnet Confluent,考虑到 ASP.NET Core Blazor Server Web 应用程序中的以下架构建筑学

由于与消费者相关的费用,我读过建议每个进程有一个单例 kafka 消费者。这在一个持续进程生命周期的长时间运行的任务中执行。

如果我希望登录的用户能够在应用程序页面上接收更新以监控从 kafka 主题收到的消息……我怎样才能让每个用户的范围服务自动接收消费消息的通知并更新页?

  1. 信号机?
  2. 将事件(对于每个消费的消息)发布到事件总线,例如。NServiceBus、Rebus 等?每个前端页面都会在收到从事件总线收到的事件通知时刷新.....
  3. API 作为 kafka 主题的前端。登录用户的每个前端页面然后频繁轮询 API 以刷新?
0 投票
1 回答
927 浏览

.net - 异步的Kafka生产者不返回DeliveryReport但DeliveryResult

我正在尝试将消息写入 Kafka,下面是我的生产者,如果我使用生产它有 DeliveryHandler 并且我可以访问 DeliveryReport,但是当我使用ProduceAsync时返回类型是 DeliveryResult 我如何获取 DeliveryReport 并记录失败的原因

使用生产:

在上面的代码中,我可以访问继承 DeliveryResult 的 DeliveryReport,并且可以访问 Error Reason 和 DeliveryResult --> TopicPartitionOffset,以下是元数据:

使用 ProduceAsync

在上述方法中,使用 ProducerAsync 时,我如何访问 DeliveryReport 以记录错误原因,就像 Produce 一样,当我在 ProducerAsync 上等待时,它返回 DeliveryResult 但不返回 DeliveryReport

此外,在写入 Kafka 时使用 Produce 或 ProduceAsync 也很好。

0 投票
1 回答
862 浏览

c# - 消息在 Confluent Kafka Dotnet 中丢失

我在我最近的 c# 项目中使用了 Confluent kafka 包。我通过以下方式创建了一个生产者:

但问题是我的一些消息没有到达消费者。他们正在某个地方迷路。但是,如果我对生产者使用等待,那么所有消息都会被传递。如何在不等待的情况下传递我的所有消息。(我有一个分区)

0 投票
1 回答
271 浏览

c# - 使用 Autofac C# 进行依赖注入 - 注册泛型类型时是否可以指定工厂方法

我对 DI 和 Autofac 很陌生。我有一个单例类ProducerService,如下所示。这在 BackgroundService 的构造函数中用作依赖项。ProducerService 有一个构造函数注入依赖项,IKafkaProducerAggregate下面也列出了。

如何创建单个实例IKafkaProducerAggregate<TKey, Tvalue>以注入 ProducerService<TKey, TValue>?这包含一个IProducer来自外部 Kafka 融合 dotnet 库的实例,通过工厂方法IProducer<TKey, TValue> producer = new ProducerBuilder<TKey, TValue>(Config.Producer).Build();。我可以在 Autofac 中使用某种工厂方法来创建聚合实例并IProducer使用 Kafka 融合工厂方法进行初始化吗?

到目前为止,我有一个注册生产者服务的 Autofac 模块,并且正在努力理解如何使用类似于工厂方法的东西???这将在创建 ProducerService 时创建一个 IKafkaProducerAggregate :

接口:IKafkaProducerAggregate

类:生产者服务

0 投票
1 回答
739 浏览

apache-kafka - 如何即时/启动时创建 Kafka 主题以供生产者发送?

我开始使用适用于 Kafka 的 Confluent .NET 库,并尝试实现我用于 Azure 服务总线的模式,以便在生产者应用程序启动时创建主题(如果不存在则创建)。这将如何在 Kafka API 中完成,并且可以完成吗?

这将允许主题成为源代码控制的一部分并在自动发布过程中进行配置,而不是手动设置每个主题/环境。此外,我希望我的开发人员不必去每个 Kafka 实例/环境并首先配置它们以匹配。

如果我不能这样做,我将不得不在发布过程中将其烘焙到 bash 脚本中,但更喜欢在启动代码中使用它。

0 投票
1 回答
202 浏览

kubernetes - srimzi operator 0.20 kafka 'useServiceDnsDomain' 没有效果

问题:由于某种原因,客户端 pod 只能解析完全限定的完全限定 DNS 名称,包括集群服务后缀。

这个问题在这个问题中说明了: AKS, WINdows Node, dns does not resolve service until fully qualified name is used

为了解决这个问题,我使用了 useServiceDnsDomain 标志。文档(https://strimzi.io/docs/operators/master/using.html#type-GenericKafkaListenerConfiguration-schema-reference)将其解释为

配置是否应使用 Kubernetes 服务 DNS 域。如果设置为 true,则生成的地址包含服务 DNS 域后缀(默认为 .cluster.local,可以使用环境变量 KUBERNETES_SERVICE_DNS_DOMAIN 进行配置)。默认为 false。此字段只能与内部类型侦听器一起使用。

我的部分yaml如下

这没有做任何事情,所以我也尝试添加 KUBERNETES_SERVICE_DNS_DOMAIN 如下所示

srimzi/operator:0.20.0 图像正在使用中。

在我的客户端(.net Confluent.Kafka 1.4.4)中,我使用 tt-kafka-kafka-bootstrap.shared.svc.cluster.local 作为 BootstrapServers。它给了我错误

错误:GroupCoordinator:无法解析“tt-kafka-kafka-2.tt-kafka-kafka-brokers.shared.svc:9092”:不知道这样的主机。

我期望代理服务向客户端提供全名,但从错误看来 useServiceDnsDomain 没有效果。

任何帮助表示赞赏。谢谢。