8

我知道不可能在 Kafka 中对多个分区进行排序,并且分区排序只能保证组内的单个消费者(对于单个分区)。但是,使用 Kafka Streams 0.10 现在可以实现这一点吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者端,假设使用 Kafka Streams 0.10,现在这可能吗?假设我们收到所有消息,我们是否不能根据使用的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供使用?

目前我需要保持排序,但这意味着有一个分区和一个消费者线程。我想将其更改为多个分区以增加并行度,但以某种方式“让它们按顺序排列”。

有什么想法吗?谢谢你。

4

2 回答 2

15

在这种情况下,您面临两个问题:

  1. 具有多个分区的 Kafka 主题,事实上 Kafka 不保证此类多分区主题的全局排序(主题的)。
  2. 主题及其分区的迟到/乱序消息的可能性,这与时间和时间戳有关。

我知道不可能在 Kafka 中对多个分区进行排序,并且分区排序只能保证组内的单个消费者(对于单个分区)。但是,使用 Kafka Streams 0.10 现在可以实现这一点吗?

简短的回答是:不,当您从具有多个分区的 Kafka 主题中读取时,仍然无法实现全局顺序。

此外,“分区排序”是指“基于分区中消息的偏移量的分区排序”。排序保证与消息的时间戳无关。

最后,只有在以下情况下才能保证订购max.in.flight.requests.per.connection == 1

Apache Kafka 文档中的 生产者配置设置max.in.flight.requests.per.connection:(默认值5:):客户端在阻塞之前将在单个连接上发送的未确认请求的最大数量。请注意,如果此设置设置为大于 1 并且发送失败,则存在由于重试(即,如果启用重试)而导致消息重新排序的风险。

请注意,此时我们正在讨论 Kafka 中的消费者行为(这是您最初的问题的开始)和生产者行为的组合。

如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者端,假设使用 Kafka Streams 0.10,现在这可能吗?

即使使用时间戳功能,我们仍然无法实现“每个分区中的每条消息都保持顺序”。为什么?因为可能会出现迟到/乱序消息。

分区按偏移量排序,但不保证按时间戳排序。在实践中,分区的以下内容是完全可能的(时间戳通常是自时代以来的毫秒数):

Partition offsets     0    1    2    3    4    5    6    7    8
Timestamps            15   16   16   17   15   18   18   19   17
                                          ^^
                                         oops, late-arriving data!

什么是迟到/乱序消息?想象一下,你有遍布世界各地的传感器,所有这些传感器都测量它们的本地温度并将最新的测量值发送到 Kafka 主题。一些传感器的互联网连接可能不可靠,因此它们的测量结果可能会延迟几分钟、几小时甚至几天。最终,他们延迟的测量将到达卡夫卡,但他们会“迟到”。城市中的手机也是如此:有些可能会耗尽电池/能量并需要充电才能发送数据,有些可能会因为您在地下驾驶而失去互联网连接等。

假设我们收到所有消息,我们是否不能根据使用的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供使用?

理论上是的,但在实践中这是相当困难的。“我们收到所有消息”的假设实际上对于流系统来说是具有挑战性的(即使对于批处理系统也是如此,尽管这里可能会简单地忽略迟到数据的问题)。你永远不知道你是否真的收到了“所有消息”——因为可能会有迟到的数据。如果您收到迟到的消息,您希望发生什么?再次重新处理/重新排序“所有”消息(现在包括迟到的消息),还是忽略迟到的消息(从而计算不正确的结果)?从某种意义上说,通过“让我们对它们全部排序”实现的任何此类全局排序要么是非常昂贵的,要么是尽力而为。

于 2016-09-20T12:14:46.767 回答
1

我没有使用 Kafka 流 - 但可以使用普通消费者来执行此操作。

首先对分区进行排序 - 这假设您已经在每个您想要的或使用 Consumer Group 的地方寻找偏移量。

private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) {

    Set<TopicPartition> pollPartitions = events.partitions();
    List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>();
    for (TopicPartition tp : pollPartitions) {
        orderEvents.add(events.records(tp));
    }
    // order the list by the first event, each list is ordered internally also
    orderEvents.sort(new PartitionEventListComparator());
    return orderEvents;
}

/**
 * Used to sort the topic partition event lists so we get them in order
 */
private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> {

    @Override
    public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) {
        long c1 = list1.get(0).timestamp();
        long c2 = list2.get(0).timestamp();
        if (c1 < c2) {
            return -1;
        } else if (c1 > c2) {
            return 1;
        }

        return 0;
    }


}

然后只需轮询分区以按顺序获取事件 - 在实践中我发现这是可行的。

                ConsumerRecords<String, String> events = consumer.poll(500);
                int totalEvents = events.count();
                log.debug("Polling topic - recieved " + totalEvents + " events");
                if (totalEvents == 0) {
                    break;  // no more events
                }

                List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events);

                int cnt = 0;
                // Each list is removed when it is no longer needed
                while (!orderEvents.isEmpty() && sent < max) {
                    for (int j = 0; j < orderEvents.size(); j++) {
                        List<ConsumerRecord<String, String>> subList = orderEvents.get(j);
                        // The list contains no more events, or none in our time range, remove it
                        if (subList.size() < cnt + 1) {
                            orderEvents.remove(j);
                            log.debug("exhausted partition - removed");
                            j--;
                            continue;
                        }
                        ConsumerRecord<String, String> event = subList.get(cnt);
                        cnt++
}
于 2016-09-19T18:42:03.863 回答