问题标签 [confluent-kafka-dotnet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
134 浏览

.net-core - Confluent Kafka .net core 批量消费轮询和自动提交

.net kafka 中没有 Poll 方法。我想轮询 50 条消息并通过 10 个线程并行处理。但它不支持 .

这是我找到的解决方案。

现在我可以收到 100 条消息。但是如何处理提交?如果我错了,请纠正我。

例如;

分区中有 300 条消息。

我将自动提交设置为 true,间隔为 5 秒;

它检查每个调用的 Consume 方法 5 秒。

第一次消费开始计时器。我在 while 循环中收到 99 条消息。这意味着我调用了 100 次消费方法。每个消耗方法检查时间为 5 秒。当我收到最后一条消息时。我花了500毫秒。还有 4.5 秒的时间来提交。

我花了 8 秒处理 100 条消息。但我还没有调用下一个消费方法。并且偏移量没有提交。当我下次消费时,消费者将承诺。偏移量将是 100。因为提交的时间到了。

当我再次消费时。计时器重新启动 5 秒。

那正确吗 ?

或者提交独立于消费方法。后台任务每 5 秒提交一次

0 投票
1 回答
146 浏览

c# - Confluent Kafka Consumer 没有收到任何东西

我遇到了以下代码的问题:

但是消费者不工作!正如我在 Debug 中看到的,它会冻结,直到 cancelationToken 过期,抛出 OperationCancelledException,我没有收到任何结果。

生产者似乎工作,它不会冻结或返回错误(使用类似的配置)。

为什么不起作用?

0 投票
2 回答
1100 浏览

c# - C# Confluent.Kafka SetValueDeserializer 对象反序列化

在我的消费者中,我想反序列化 Kafka protobuf 消息。键是字符串类型,但消息值是 protobuf 对象。我知道我必须为消息值创建自己的自定义反序列化器,但不知道如何创建一个。这是我的消费者实现,我需要替换标记的行:

Protobuf 消息格式:

0 投票
1 回答
344 浏览

c# - c# confluent.kafka 无法使用 Protobuf-net 反序列化 protobuf 消息

继续我之前的问题C# Confluent.Kafka SetValueDeserializer object deserialization,我尝试创建自定义反序列化器来反序列化 protobuf 消息,但收到此错误:

在线的:

这是我的消费者和反序列化器:

FinalValue.proto

最终值.cs

0 投票
1 回答
630 浏览

c# - Kafka:使用手动批处理消耗分区 - 正在跳过消息

我正在使用Confluent Kafka .NET为分区主题创建消费者。

由于 Confluent Kafka .NET 不支持批量消费,我构建了一个函数来消费消息,直到达到批量大小。此函数的想法是仅使用来自同一分区的消息构建批处理,这就是为什么一旦我使用具有不同分区的结果并返回到目前为止我能够使用的任何数量的消息时我停止构建批处理的原因.

目标或目的:我希望能够处理我在批处理中返回的消息,并仅提交这些消息的偏移量。IE:

从分区消费的消息 抵消 批量存储
0 0 是的
0 1 是的
2 0

从上表中,我想处理我从分区 0 获得的两条消息。来自分区 2 的消息将被忽略,并且(希望)稍后在对ConsumeBatch的另一个调用中拾取。

要提交,我只需调用同步Commit函数,将我处理的最新消息的偏移量作为参数传递。在这种情况下,我将传递上表中显示的批次的第二条消息的偏移量(分区 0 - 偏移量 1)。

问题:

问题是,由于某种原因,当我像上面显示的那样构建一个批处理时,由于验证而决定不处理的消息将被永远忽略。即:分区 2 的消息 0 将永远不会被消费者再次拾取。

正如您在下面的消费者配置中看到的那样,我将EnableAutoCommitEnableAutoOffsetStore都设置为 false。我认为这足以让消费者不对偏移量做任何事情,并且能够在另一个Consume调用中接收被忽略的消息,但事实并非如此。无论我的配置如何,偏移量都会以某种方式增加到每个分区的最新消费消息。

如果可能的话,任何人都可以告诉我我在这里缺少什么以实现所需的行为吗?

构建批处理的函数的简化版本:

ConsumerConfig 用于实例化我的消费者客户端:

附加信息: 正在测试:

  • 1 个主题,6 个分区,复制因子为 2
  • 3个经纪人
  • 1 个属于一个消费者组的单线程消费者客户端
  • Windows 10 上带有 wsl2 的本地环境
0 投票
0 回答
93 浏览

asp.net-core-webapi - Dotnet Kafka Consumer 按计划/按需暂停和恢复

我们有一个从 WebApi 项目运行的 Dotnet BackgroundService(Microsoft.Extensions.Hosting)。在 ExecuteAsync 方法中运行了一个 Kafka 消费者(对 Kafka 世界来说是相当新的)。我们希望我们的 Kafka 消费者根据时间表停止/开始消费(在非工作日停止,仅在工作日上午 8 点至下午 6 点之间运行等)。据我们所知,有几种方法可以完成这项工作。

  1. 根据计划运行后台服务。我们做了一些基础研究,但仍然不确定如何实现它。StartAsync并且StopAsync似乎与应用程序生命周期事件有关,并且不喜欢我们将能够用于我们的目的的东西。
  2. 使用consumer.pause()resume()根据时间表工作。我们如何通过 Dotnet 以编程方式实现这一点。我们无法在 CosumerBuilder 类上找到任何特定的事件处理程序(自定义计划事件)。任何关于什么是最佳选择以及如何实现它的方向的任何建议/建议将不胜感激。
0 投票
0 回答
423 浏览

apache-kafka - 用于代理验证的 Kafka ssl ca 位置

我正在使用基于 librdkafka 的 .net 的融合 kafka 库。根据此处的文档:https ://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#ssl 如果未设置 ssl.ca.location,它将探测默认路径。当我没有明确设置它时,它使用“/etc/ssl/certs/ca-certificates.crt”并且工作正常。

现在出于验证目的,我从 /etc/ssl/certs 中删除了所有其他证书,并在 /etc/ssl/certs 中放置了一个新文件(比如“kafka_root_ca.pem”)。根据没有明确设置 ssl.ca.location 的文档,它应该自动探测路径“/etc/ssl/certs”中的证书,但事实并非如此。甚至尝试设置 ssl.ca.location = 'probe' (根据融合库的建议),但它仍然没有从上面的文件夹中选择。只有当我设置绝对路径“/etc/ssl/certs/kafka_root_ca.pem”时它才有效。

我是否遗漏了什么,因为根据上述文档,如果未设置,它应该自动从 /etc/ssl/certs 文件夹中选择。

0 投票
1 回答
139 浏览

.net-core - 使用 Kafka + .Net Core + MassTransit 实现请求响应模式

有人可以告诉我如何使用带有 .net 核心(2.1,2. 或 3.1)的 kafka 实现请求响应模式。我更喜欢使用轻量级消息总线 MassTransit 来实现这种模式。我无法在他们的文档中找到参考实现。可能是我错过了。有人可以分享一些使用 MassTransit 的标准参考实现。如果找不到,任何使用 Confluent Kafka 实现的示例(请求响应模式的实现)也很值得一看。

谢谢大家

0 投票
1 回答
127 浏览

.net-core - 为什么不建议使用 Kafka 实现请求/响应模式

有些人不建议在微服务世界中使用 kafka 来实现请求/响应模式。

但我还不清楚为什么不推荐它?同时,我看到 spring kafka 开箱即用地支持这种模式,而 .net 没有通过任何库(既不通过 confluent kafka 也不通过 MassTransit)开箱即用地提供此功能。如果不推荐通过 kafka,我们是否有任何其他标准方法可以通过 .net 核心实现这种情况。有人可以为我提供一个合理的技术答案来说服自己吗?

更新

我正在编辑我的问题以解释业务用例假设我需要显示我的帐户余额。我从前端向api控制器发出请求[http请求],然后会触发三个事件来获取储蓄账户余额/固定账户余额/活期账户余额。

这些事件将发布到三个 kafka 主题,然后由三个消费者监听。这些消费者分别处理请求并发布到另外三个主题。将等待初始 http 调用,直到这些主题被消耗。一旦它被消耗 http 调用响应前端。

应该等待 resultOneTask 和 resultTwoTask 直到响应主题被消耗。如果 resultOneTask 和 resultTwoTask 是 http 调用,则没有实现它的问题。此用例甚至还支持 RMQ。但我无法找到任何使用 confluent kafka 或 MassTransit for kafka 的标准实现。

提前致谢

0 投票
1 回答
42 浏览

apache-kafka - 基于分区Key的主题分区

我正在尝试根据分区键将流拆分为多个分区,但显然它不起作用。实现是这样的,我有一个类可以说Metrices

度量标准将一次异步使用一个,并且可能具有不同的MetriceType. 我想要实现的是在MetriceType. 到目前为止我所尝试的。

  1. 将消息键(partitionKey)设置为MetricesType

消息总是在Partition.Value= 0发布

  1. Confluent.Kafka 库中的 Partitoner 类,希望有类似于 Custom Partitoner的东西,这个链接但找不到任何 .net 实现。

所以我的问题是,在这种情况下,有没有办法根据属性拆分我的收入消息,MetriceType并将它们发布在他们的专用分区上(排序是必不可少的),或者我唯一的选择是使用和AdminClient编码创建一个主题分区计数,或者我可以研究另一种方法。提前致谢。