我是 Golang 和 Kafka 的新手,我正在使用 segmentio kafka-go 使用 Golang 连接到 Kafka 服务器。到目前为止,我想在 Kafka 中推送用户的每个事件,所以我想推送单个消息(而不是批量),但是由于该库提供的写操作对于批量或单个消息都需要相同的时间,所以它需要很多时间。有什么方法可以快速编写单个消息,以便我可以在更短的时间内在 kafka 中推送数百万个事件?
我已经针对单个消息和批处理消息对其进行了测试,它需要相同的时间(最短为 10 毫秒)。
我是 Golang 和 Kafka 的新手,我正在使用 segmentio kafka-go 使用 Golang 连接到 Kafka 服务器。到目前为止,我想在 Kafka 中推送用户的每个事件,所以我想推送单个消息(而不是批量),但是由于该库提供的写操作对于批量或单个消息都需要相同的时间,所以它需要很多时间。有什么方法可以快速编写单个消息,以便我可以在更短的时间内在 kafka 中推送数百万个事件?
我已经针对单个消息和批处理消息对其进行了测试,它需要相同的时间(最短为 10 毫秒)。
我认为您的问题只是 WriterConfig。
例如,如果您的配置看起来像 segmentio/kafka-go 文档中的示例:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
})
您可以尝试设置批量大小和批量超时:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
BatchSize: 1,
BatchTimeout: 10 * time.Millisecond,
})
发生这种情况是因为 kafka-go 默认等待 1 秒,直到批处理达到最大大小,默认为 100 条消息,正如我们在代码中看到的那样。
希望它可以帮助你。
更新:请注意,一一发送消息会减慢该过程。例如:批量发送 100 条消息在我的电脑上耗时 0.0107 秒。一条一条发送相同的 100 条消息需要 0.0244 秒。
我对golang知之甚少。但是使用Writer.WriteMessages的以下函数具有同步发送选项。
快速写入(使用同步发送)实际上取决于您的网络往返时间,即,将消息发送到 Kafka 所花费的时间加上从 Kafka 获得确认所花费的时间。
如果您使用的是同步发送,那么您的发送将被阻止,直到收到确认为止。因此,为了加快速度,一种方法是减少确认。最好将其设置为 1(这意味着领导者已将消息写入其日志但未复制到追随者)。但是,如果领导者宕机并且消息没有被复制,这可能会导致丢失。
因此,您可以将其设置为acks=all
并更改min.insync.replicas=2
主题。值越小,send()
返回的速度越快,它将下一条消息推送到 Kafka 的速度就越快。