问题标签 [sarama]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
203 浏览

go - 如何为 kafka 消费者添加自定义消息反序列化器(使用 sarama lib)

序列化消息由 java 微服务生成,我需要在另一个用 golang 编写的服务中使用相同的消息 我在 golang
中为 kafka 消费者使用 sarama lib,我收到消息,但由于通过生成服务进行序列化而导致其失真
需要帮助在 Go 中为消费者添加自定义反序列化器以获取传入消息。

0 投票
1 回答
652 浏览

unit-testing - GoLang Sarama ConsumerGroup 模拟

我是 Go 新手,我也在努力模拟这个电话:sarama.NewConsumerGroup(brokers, group, config)

我正在使用 testify,我的模拟代码目前看起来像:

但我得到了错误:

我相信我错误地嘲笑了电话,但现在确定还能做什么。

0 投票
1 回答
106 浏览

json - 将 JSON 对象数组解析为单个文档

我正在收集代理生成的日志。它会生成一个较大的 JSON 输出,我需要将其分解为较小的 JSON 文档并使用 sarama 写入 kafka。由于 kafka 消息的最大大小限制,我在分解成几个单独的 JSON 文档时遇到了问题。任何建议将不胜感激。除了指示日志活动的日期/时间字段外,日志消息没有任何固定字段或数据类型

样品 #1

样品 2

我能够遍历单个消息,但不能以友好的格式写入 kafka。

0 投票
1 回答
246 浏览

apache-kafka - Sarama ClusterAdmin 连接问题 - 管道损坏

我正在使用 sarama(1.27) ClusterAdmin 来管理 kafka1.1.0 中的主题。我管理 kafka 主题的应用程序作为 REST 服务运行。我的应用程序运行良好一段时间,我可以获取/创建/删除主题。

但是在没有任何活动的情况下经过一段时间后,一个新的主题请求会出错 - write tcp xxxxx:37888->xxxxx:9092: write: broken pipe。

我遇到了这个How to fix broker may not available after broken pipe

由于我的应用程序作为服务运行,如何防止管道损坏问题?我仅在应用程序退出时关闭 ClusterAdmin。相同的 ClusterAdmin 连接用于服务所有请求。如果出于任何原因它为 nil,我会为每个请求重新初始化 clusterAdmin(通常在第一次初始化后它不是 nil,因此重用相同的连接)。

我应该在处理每个请求后关闭 clusteradmin 并为每个主题请求打开一个 NewClusterAdmin(),还是需要使用 keepalive 选项?

这是我现有的代码:

0 投票
0 回答
1125 浏览

go - Sarama Kafka 库:如何对消费者组的 session.MarkMessage() 进行单元测试?

我正在尝试从消费者组示例中调整代码github.com/Shopify/sarama,并且正在努力添加一个单元测试来测试方法session.MarkMessage()中的功能ConsumeClaimhttps://github.com/Shopify/sarama/blob/5466b37850a38f4ed6d04b94c6f058bd75032c2a/examples/consumergroup /main.go#L160)。

这是我改编的带有consume()函数的代码:

这是我为它编写的几个单元测试:

可以在 Docker 容器中运行 Kafka 和 Zookeeper 后运行测试,例如johnnypark/kafka-zookeeper

我正在努力解决以下问题:如果我注释掉该行

测试仍然通过。根据https://godoc.org/github.com/Shopify/sarama#ConsumerGroupSessionMarkMessage将消息标记为已使用,但我将如何在单元测试中对此进行测试?

0 投票
1 回答
526 浏览

go - 来自 GitHub.com/Shopify/sarama 的测试日志输出

github.com/Shopify/sarama我正在尝试为 configure's的功能选项编写单元测试Logger。像这样使用 Kafka 运行 Docker 容器后,

我正在尝试运行这个程序:

我希望看到一些输出。但是,打印输出为空:

相比之下,如果我将输出设置os.Stderr

我看到打印到终端的预期输出:

似乎*bytes.Buffer没有被“冲走” ioutil.ReadAll()?如何修复前面的示例,使其output不为空?

0 投票
1 回答
420 浏览

go - 如何为 protbuf 使用自描述消息

我在使用协议缓冲区时正在处理的用例之一是反序列化我在消费者端收到的协议缓冲区 Kafka 消息(使用 sarama 库和 Go)。

我目前的做法是我定义了示例 pixel.proto 文件,如下所示。

我正在通过 sarama.Producer 发送消息(通过编组)接收它 sarama.Consumer (通过引用已编译的 pixel.proto.pb 来解组消息)。代码如下。

如您所见,在代码中,我已导入 .proto 文件并在主函数中引用它,以便发送和接收消息。这里的问题是,解决方案不是通用的。我会在消费者端收到不同的 .proto 类型的消息。

我怎样才能使它通用?我知道作为 protobuf 的一部分,有一种叫做自描述消息(动态消息)的东西。我提到了这个链接https://developers.google.com/protocol-buffers/docs/techniques?csw=1#self-description。但它没有任何解释如何将其嵌入为 pixel.proto 的一部分(我使用过的示例),以便在消费者端我直接将其反序列化为所需的类型。

0 投票
2 回答
1038 浏览

go - 使用 Sarama 单独或批量提交消息 - 用于 Go 的 kafka 客户端

我试图让 kafka 消费者在特定时间内收集消息,之后我可以手动提交已收集的消息。但是我从shopify sarama找不到可以用来提交消息或一批消息的方法或api,请帮忙

0 投票
1 回答
482 浏览

go - 在 Sarama 中重命名 kafka 消费者组?

我有一个用例,我想将其重命名consumer_group_2consumer_group. 我想这样做是因为我想从上次消费的偏移量开始消费,consumer_group_2但我想将组 ID 从重命名consumer_groupconsumer_group_2

Sarama消费群有可能吗?

0 投票
0 回答
454 浏览

amazon-web-services - 将 sarama 消费者组连接到 AWS 上托管的融合云 kafka。没有 AMS

这是我第一次尝试连接到托管在 AWS 服务器上但不使用任何 Amazon Managed Streaming 服务的 Confluent Kafka 集群。

使用此代码(减去 Config.Net.SASL 设置),我能够毫无问题地连接到旧的自托管集群。

我无法弄清楚我做错了什么。

连接时,我收到此错误:

这是我的代码: