0

我是 Golang 和 Kafka 的新手,我正在使用 segmentio kafka-go 使用 Golang 连接到 Kafka 服务器。到目前为止,我想在 Kafka 中推送用户的每个事件,所以我想推送单个消息(而不是批量),但是由于该库提供的写操作对于批量或单个消息都需要相同的时间,所以它需要很多时间。有什么方法可以快速编写单个消息,以便我可以在更短的时间内在 kafka 中推送数百万个事件?

我已经针对单个消息和批处理消息对其进行了测试,它需要相同的时间(最短为 10 毫秒)。

4

2 回答 2

4

我认为您的问题只是 WriterConfig。

例如,如果您的配置看起来像 segmentio/kafka-go 文档中的示例:

w := kafka.NewWriter(kafka.WriterConfig{
    Brokers:      []string{"localhost:9092"},
    Topic:        "topic-A",
    Balancer:     &kafka.LeastBytes{},
})

您可以尝试设置批量大小和批量超时:

w := kafka.NewWriter(kafka.WriterConfig{
    Brokers:      []string{"localhost:9092"},
    Topic:        "topic-A",
    Balancer:     &kafka.LeastBytes{},
    BatchSize:    1,
    BatchTimeout: 10 * time.Millisecond,
})

发生这种情况是因为 kafka-go 默认等待 1 秒,直到批处理达到最大大小,默认为 100 条消息,正如我们在代码中看到的那样。

希望它可以帮助你。


更新:请注意,一一发送消息会减慢该过程。例如:批量发送 100 条消息在我的电脑上耗时 0.0107 秒。一条一条发送相同的 100 条消息需要 0.0244 秒。

于 2019-09-09T01:07:10.320 回答
0

我对golang知之甚少。但是使用Writer.WriteMessages的以下函数具有同步发送选项。

快速写入(使用同步发送)实际上取决于您的网络往返时间,即,将消息发送到 Kafka 所花费的时间加上从 Kafka 获得确认所花费的时间。

如果您使用的是同步发送,那么您的发送将被阻止,直到收到确认为止。因此,为了加快速度,一种方法是减少确认。最好将其设置为 1(这意味着领导者已将消息写入其日志但未复制到追随者)。但是,如果领导者宕机并且消息没有被复制,这可能会导致丢失。

因此,您可以将其设置为acks=all并更改min.insync.replicas=2主题。值越小,send()返回的速度越快,它将下一条消息推送到 Kafka 的速度就越快。

于 2019-09-03T13:47:34.233 回答