问题标签 [message-hub]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
836 浏览

java - 消息中心上的 Kafka Streams KTable 配置错误

此问题现已在 Message Hub 上解决

我在 Kafka 中创建 KTable 时遇到了一些麻烦。我是 Kafka 的新手,这可能是我问题的根源,但我想无论如何我都可以在这里问。我有一个项目,我想通过计算它们的总出现来跟踪不同的 ID。我正在使用IBM Cloud 上的Message Hub来管理我的主题,到目前为止它运行良好。

我有一个关于 Message Hub 的主题,它会产生类似的消息{"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"},目前,唯一相关的关键是 ID。

我的 Kafka 代码以及 Streams 配置如下所示:

当我运行代码时,我收到以下错误:

线程“KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1”org.apache.kafka.streams.errors.StreamsException 中的异常:无法创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition .

其次是:

引起:java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.PolicyViolationException:无效配置:{segment.index.bytes=52428800,segment.bytes=52428800,cleanup.policy=delete,segment.ms =600000}。仅允许的配置:[retention.ms, cleanup.policy]

我不知道为什么会发生这个错误,以及可以做些什么。我构建 KStream 和 KTable 的方式是否不正确?或者也许是bluemix 上的消息中心?

解决了:

从我标记为正确的答案下方的评论中添加摘录。原来我的 StreamsConfig 很好,并且(目前)在 Message Hub 方面存在问题,但有一个解决方法:

事实证明,在使用 Kafka Streams 1.1 创建重新分区主题时,Message Hub 存在问题。在我们进行修复时,您需要手动创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题(myTopic)一样多的分区,并将保留时间设置为最大值。修复后我会发表另一条评论

非常感谢您的帮助!

0 投票
1 回答
1723 浏览

java - Kafka Streams 在分组和聚合时使用 KTable 转换为字符串问题

我有一个带有传入消息的 Kafka 流,看起来 sensor_code: x, time: 1526978768, address: Y 我想创建一个 KTable,在每个传感器代码处存储每个唯一地址。

KTable

在哪里kvm1kvm2是我自己的KeyValueMappers。我的想法是用 , 替换现有的密钥sensor_code=x, address=y,执行一个groupByKey()and count()。然后另一个groupBy(kvm2, Serialized.with(stringSerde, longSerde))wherekvm2修改现有的key以包含 thesensor_code然后值将是它的计数。但由于它不起作用,也许我做错了......它试图将其转换为 Long 并引发异常,因为它正在寻找一个字符串。我想要计数Long,对吗?

这是KeyValueMapper我使用其各自帮助功能的第一个:

这是第二个KeyValueMapper及其帮助功能。

这是我尝试运行代码时返回的异常和错误。

[2018-05-29 15:28:40,119] 错误流线程 [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] 由于以下错误,无法处理流任务 2_0:(org.apache. kafka.streams.processor.internals.AssignedStreamsTasks:105) java.lang.ClassCastException: java.lang.Long 无法在 org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java :28) 在 org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) 在 org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) 在 org .apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) 在 org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore。put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process( KTableAggregate.java:95) 在 org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)

注意java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String错误。

任何想法为什么会出现此错误以及如何修复它或建议如何编辑代码以达到我所提到的所需输出?

提前谢谢了!

编辑: 由于我放弃了其中一种方法,因此对我的问题进行了大修。

0 投票
1 回答
428 浏览

ibm-cloud - 配置 Apache NiFi 以将消息发送到 Message Hub

我需要设置哪些属性才能开始将消息从 Apache NiFiPublishKafka处理器发送到 IBM Cloud 中的 Message Hub?

在此处输入图像描述

我已输入这些值以供参考

在此处输入图像描述

我在 Ubuntu 18.04 上这样做

0 投票
1 回答
53 浏览

ibm-cloud - 创建消息中心服务时出错

创建 IBM Message Hub 服务时出现以下错误。您能帮忙检查导致此错误的原因吗?

0 投票
1 回答
1784 浏览

java - 如何使用带有消息集线器的 kafkacat

在尝试将 kafkacat 与消息中心一起使用时,我使用了以下内容:

并得到以下错误:

如何使用带有提供凭据的 kerberos 创建正确的密钥,然后使用这些密钥从主题中使用?

0 投票
2 回答
290 浏览

message-hub - Message Hub & Confluent Kafka Connect S3

I have requirement to consume messages from IBM MHub topic into IBM Object Storage.

I got it working with local Kafka server with Confluent Kafka Connect S3 plugin as standalone worker for sink Amazon S3 bucket and file. Both was a success.

If I configure Confluent Kafka Connect S3 as distributed worker for IBM MHub cluster I get no errors but still no messages end up to Amazon S3 bucket. I tried file sink also, no luck either.

Is it possible at all?

0 投票
1 回答
3155 浏览

apache-kafka - 错误:配置属性“security.protocol”的值“sasl_ssl”无效

我正在使用 node-rdkafka 通过以下选项连接到 IBM MessageHub:

客户端在 IBM Kubernetes Service pod (Ubuntu) 上运行。
请指教。谢谢。