我是 Kafka 新手,并尝试创建一个小型 Kafka KTable 实现。我已经成功添加了一个 KTable 并且能够查询。我使用了当地的州立商店,它按预期工作。以下是我的本地状态商店配置
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties) {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MessageSerdes.class.getName());
config.put(StreamsConfig.STATE_DIR_CONFIG, directory);
//TODO : verify error strategy
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
return new KafkaStreamsConfiguration(config);
}
现在我想使用 RPC 来使用 Global State。我对几个问题感到困惑。要添加全局状态存储,我需要添加 RPC 端点
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "127.0.0.1:8080");
文件说
“唯一的要求是 RPC 层嵌入到 Kafka Streams 应用程序中”
- 这是否意味着我们需要在 Kafka 应用程序中创建一个客户端端点,如果是这样,如果它是一个具有 Web 依赖关系的 Spring Boot 应用程序,它就像“localhost:8080”
- 此应用程序的其他实例将如何仅通过 APPLICATION_SERVER_CONFIG (application.server) 连接并执行交互式查询或保持状态同步。我的意思是如何为同一应用程序的其他实例提供额外的配置以在全局状态下创建同步。
- 如果创建了全局状态,无论出于何种原因,我们是否需要在 Mongodb 或其他地方保留备份。(容错)考虑到数据库永远不会像写入磁盘一样快,我们是否关心它还是应该依赖分布式架构
如果给出一些带有示例的 Kafka Global State Store 实现,那就太好了。