3

Quarkus应用程序中,我需要将 tombstone 消息发布到压缩的 Apache Kafka 主题。由于我的用例是必要的,因此我使用 anEmitter来向主题发送消息(如 quarkus 博客中所建议的那样)。非墓碑消息(带有效负载)的代码是:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<MyDataStructure> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
    OutgoingKafkaRecordMetadata<String> metadata =
        OutgoingKafkaRecordMetadata.<String>builder()
            .withKey(myData.getTopicKey())
            .build();
    return CompletableFuture.runAsync(
        () -> emitter.send(Message.of(myData).addMetadata(metadata)));
  }
}

我希望这些工具可以让我制作一个带有Emitter有效载荷的墓碑消息。不幸的是,工厂方法的所有实现,允许提供元数据(需要提供消息密钥),指定有效负载,不能是 {@code null}<M extends Message<? extends T>> void send(M msg)MessagenullMessage.of(..)

使用Emitter

4

1 回答 1

3

我建议使用Record该类(请参阅文档)。ARecord是key/value对,代表要写入的Kafka记录的key和value。两者都可以null,但在您的情况下,只有值部分应该是null: Record.of(key, null);

因此,您需要将 Emitter 的类型更改为Record<Key, Value>,例如:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<Record<Key, MyDataStructure>> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
      return emitter.send(Record.of(myData.getTopicKey(), null);
  }
}

虽然runAsync很方便,但发射器已经是异步的。所以,没有必要使用它。此外,容器中的行为可能非常显着(如果您的并行度低于 2)。

我的代码返回了send方法的结果,即CompletionStage. 当记录写入 Kafka(并由代理确认)时,该阶段将完成。

于 2021-05-19T14:04:41.277 回答