1

我是 Kafka 新手,并尝试创建一个小型 Kafka KTable 实现。我已经成功添加了一个 KTable 并且能够查询。我使用了当地的州立商店,它按预期工作。以下是我的本地状态商店配置

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties) {
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MessageSerdes.class.getName());
    config.put(StreamsConfig.STATE_DIR_CONFIG, directory);
    //TODO : verify error strategy
    config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
    return new KafkaStreamsConfiguration(config);
}

现在我想使用 RPC 来使用 Global State。我对几个问题感到困惑。要添加全局状态存储,我需要添加 RPC 端点

config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "127.0.0.1:8080");

文件说

“唯一的要求是 RPC 层嵌入到 Kafka Streams 应用程序中”

  • 这是否意味着我们需要在 Kafka 应用程序中创建一个客户端端点,如果是这样,如果它是一个具有 Web 依赖关系的 Spring Boot 应用程序,它就像“localhost:8080”
  • 此应用程序的其他实例将如何仅通过 APPLICATION_SERVER_CONFIG (application.server) 连接并执行交互式查询或保持状态同步。我的意思是如何为同一应用程序的其他实例提供额外的配置以在全局状态下创建同步。
  • 如果创建了全局状态,无论出于何种原因,我们是否需要在 Mongodb 或其他地方保留备份。(容错)考虑到数据库永远不会像写入磁盘一样快,我们是否关心它还是应该依赖分布式架构

如果给出一些带有示例的 Kafka Global State Store 实现,那就太好了。

4

1 回答 1

2

首先,这不是 Global State,如果要使用 Global State,则应该构建 GlobalKtable 而不是 KTable。当您将 KTable 物化到状态存储时,您的状态存储会被分区,并且这些分区会分布在您的应用程序实例中,并且每个实例只能查询它的状态存储,因此名称为local state. 您可以通过向每个应用程序实例添加 RPC 层来访问其他实例的存储。

  1. 你的意思是服务器端点吗?是的。
  2. 卡夫卡文档指出 Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of StreamsMetadata.

使用该StreamsMetadata实例,您可以获得HostStoreInfo具有包含您要查询的键的分区的应用程序实例。

  1. 在您的情况下(您正在使用 KTable),它是本地状态,它由启用日志压缩的内部 Kafka 更改日志主题支持,因此您的本地状态是容错的,您的本地状态在启动期间使用此更改日志主题恢复,这主题有格式:
<application.id>-<your-local-state-store-name>-changelog

您可以在此处查看如何查询整个应用程序的远程状态存储的示例。

于 2020-03-16T12:02:51.817 回答