8

我将 Kafka 提交策略设置为最新并且缺少前几条消息。如果我在开始将消息发送到输入主题之前休眠 20 秒,那么一切都按预期工作。我不确定问题是否与消费者花费很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好?

4

6 回答 6

2
  • 您可以使用consumer.assignment(),它将返回一组分区并验证是否分配了所有可用于该主题的分区。

  • 如果您使用的是 spring-kafka 项目,您可以包含 spring-kafka-test 依赖并使用以下方法等待主题分配,但您需要有容器。 ContainerTestUtils.waitForAssignment(Object container, int partitions);

于 2018-01-09T23:07:38.110 回答
1

感谢 Alexey(我也投了赞成票),我似乎基本上按照相同的想法解决了我的问题。

只是想分享我的经验......在我们的例子中,我们以请求和响应的方式使用 Kafka,有点像 RPC。请求正在发送一个主题,然后等待另一个主题的响应。遇到类似的问题,即错过了第一反应。

我已经... KafkaConsumer.assignment();反复尝试(使用Thread.sleep(100);),但似乎没有帮助。添加 aKafkaConsumer.poll(50);似乎已经启动了消费者(组)并收到了第一个响应。测试了几次,它现在一直在工作。

顺便说一句,测试需要停止应用程序并删除 Kafka 主题,并且为了一个好的措施,也重新启动了 Kafka。

PS:像 Alexey 提到的那样,只是在poll(50);没有获取逻辑的情况下调用,可能无法保证消费者(组)已准备好。assignment();

于 2018-08-11T16:01:27.443 回答
1

您可以执行以下操作:

我有一个从 kafka 主题读取数据的测试。
所以你不能在多线程环境中使用KafkaConsumer,但是你可以传递参数“AtomicReference assignment”,在消费者线程中更新,在另一个线程中读取。

例如,在项目中截取工作代码进行测试:

    private void readAvro(String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      String bootstrapServers,
                      int readTimeout) {
    // print the topic name
    AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
    new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();

    long startTime = System.currentTimeMillis();
    long maxWaitingTime = 30_000;
    for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
        Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
        System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
        if (assignments.size() > 0) {
            break;
        }
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            needStop.set(true);
            break;
        }
    }
    System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}

private void readAvro(String bootstrapServers,
                      String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      int readTimeout,
                      AtomicReference<Set<TopicPartition>> assignment) {

    KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
    System.out.println("Subscribed to topic: " + readFromKafka);
    consumer.subscribe(Collections.singletonList(readFromKafka));

    long started = System.currentTimeMillis();
    while (!needStop.get()) {
        assignment.set(consumer.assignment());
        ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
        events.addAll(CommonUtils4Tst.readEvents(records));

        if (readTimeout == -1) {
            if (events.size() > 0) {
                break;
            }
        } else if (System.currentTimeMillis() - started > readTimeout) {
            break;
        }
    }

    needStop.set(true);

    synchronized (MainTest.class) {
        MainTest.class.notifyAll();
    }
    consumer.close();
}

PS
needStop - 全局标志,在成功
事件失败的情况下停止所有正在运行的线程- 对象列表,我想检查
readTimeout - 我们将等待多长时间才能读取所有数据,如果 readTimeout == -1,然后当我们读到任何东西时停下来

于 2018-08-08T15:17:46.797 回答
0

您可以修改AlwaysSeekToEndListener(仅侦听新消息)以包含回调:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
    private final Consumer<K, V> consumer;
    private Runnable callback;

    public AlwaysSeekToEndListener(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public AlwaysSeekToEndListener(Consumer<K, V> consumer, Runnable callback) {
        this.consumer = consumer;
        this.callback = callback;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
        if (callback != null) {
            callback.run();
        }
    }
}

并订阅一个闩锁回调:

CountDownLatch initLatch = new CountDownLatch(1);

consumer.subscribe(singletonList(topic), new AlwaysSeekToEndListener<>(consumer, () -> initLatch.countDown()));

initLatch.await(); // blocks until consumer is ready and listening

然后继续启动您的生产者。

于 2021-06-30T07:21:06.500 回答
0

在进行一些测试之前,我需要知道 kafka 消费者是否准备好,所以我尝试使用consumer.assignment(),但它只返回分配的分区集,但是有一个问题,我看不到这个分区是否分配到组已经设置了偏移量,所以后来当我尝试使用消费者时,它没有正确设置偏移量。

解决方案是使用commit(),这将为您提供您放入参数中的给定分区的最后提交偏移量。

因此,您可以执行以下操作:consumer.committed(consumer.assignment())

如果还没有分配分区,它将返回:

{}

如果分配了分区,但还没有偏移:

{name.of.topic-0=null, name.of.topic-1=null}

但是如果有分区和偏移量:

{name.of.topic-0=OffsetAndMetadata{offset=5197881, leaderEpoch=null, metadata=''}, name.of.topic-1=OffsetAndMetadata{offset=5198832, leaderEpoch=null, metadata=''}}

有了这些信息,您可以使用以下内容:

consumer.committed(consumer.assignment()).isEmpty();
consumer.committed(consumer.assignment()).containsValue(null);

有了这些信息,你就可以确定 kafka 消费者已经准备好了。

于 2022-03-04T20:33:32.857 回答
0

如果您的策略设置为最新(如果没有以前提交的偏移量则生效)但您没有以前提交的偏移量,那么您不应该担心“丢失”消息,因为您告诉 Kafka 不要关心那些已“先前”发送给准备就绪的消费者。

如果您关心“以前的”消息,则应将策略设置为最早。

在任何情况下,无论采用何种策略,您看到的行为都是暂时的,即一旦提交的偏移量保存在 Kafka 中,每次重新启动时,消费者都会从他们之前离开的地方开始

于 2018-01-03T10:06:52.953 回答