7

此问题现已在 Message Hub 上解决

我在 Kafka 中创建 KTable 时遇到了一些麻烦。我是 Kafka 的新手,这可能是我问题的根源,但我想无论如何我都可以在这里问。我有一个项目,我想通过计算它们的总出现来跟踪不同的 ID。我正在使用IBM Cloud 上的Message Hub来管理我的主题,到目前为止它运行良好。

我有一个关于 Message Hub 的主题,它会产生类似的消息{"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"},目前,唯一相关的关键是 ID。

我的 Kafka 代码以及 Streams 配置如下所示:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

当我运行代码时,我收到以下错误:

线程“KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1”org.apache.kafka.streams.errors.StreamsException 中的异常:无法创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition .

其次是:

引起:java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.PolicyViolationException:无效配置:{segment.index.bytes=52428800,segment.bytes=52428800,cleanup.policy=delete,segment.ms =600000}。仅允许的配置:[retention.ms, cleanup.policy]

我不知道为什么会发生这个错误,以及可以做些什么。我构建 KStream 和 KTable 的方式是否不正确?或者也许是bluemix 上的消息中心?

解决了:

从我标记为正确的答案下方的评论中添加摘录。原来我的 StreamsConfig 很好,并且(目前)在 Message Hub 方面存在问题,但有一个解决方法:

事实证明,在使用 Kafka Streams 1.1 创建重新分区主题时,Message Hub 存在问题。在我们进行修复时,您需要手动创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题(myTopic)一样多的分区,并将保留时间设置为最大值。修复后我会发表另一条评论

非常感谢您的帮助!

4

1 回答 1

5

Message Hub对创建主题时可以使用的配置有一些限制。

从您收到的 PolicyViolationException 来看,您的 Streams 应用程序似乎尝试使用一些我们不允许的配置:

  • 段索引字节
  • 段字节
  • 段.ms

我猜你在 Streams 配置中的某个地方设置了它们,它们应该被删除。

请注意,您还需要在配置中设置为 3 才能使用我们的文档StreamsConfig.REPLICATION_FACTOR_CONFIG中提到的 Message Hub 。

于 2018-05-03T12:45:52.763 回答