问题标签 [message-hub]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 消息中心上的 Kafka Streams KTable 配置错误
此问题现已在 Message Hub 上解决
我在 Kafka 中创建 KTable 时遇到了一些麻烦。我是 Kafka 的新手,这可能是我问题的根源,但我想无论如何我都可以在这里问。我有一个项目,我想通过计算它们的总出现来跟踪不同的 ID。我正在使用IBM Cloud 上的Message Hub来管理我的主题,到目前为止它运行良好。
我有一个关于 Message Hub 的主题,它会产生类似的消息{"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}
,目前,唯一相关的关键是 ID。
我的 Kafka 代码以及 Streams 配置如下所示:
当我运行代码时,我收到以下错误:
线程“KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1”org.apache.kafka.streams.errors.StreamsException 中的异常:无法创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition .
其次是:
引起:java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.PolicyViolationException:无效配置:{segment.index.bytes=52428800,segment.bytes=52428800,cleanup.policy=delete,segment.ms =600000}。仅允许的配置:[retention.ms, cleanup.policy]
我不知道为什么会发生这个错误,以及可以做些什么。我构建 KStream 和 KTable 的方式是否不正确?或者也许是bluemix 上的消息中心?
解决了:
从我标记为正确的答案下方的评论中添加摘录。原来我的 StreamsConfig 很好,并且(目前)在 Message Hub 方面存在问题,但有一个解决方法:
事实证明,在使用 Kafka Streams 1.1 创建重新分区主题时,Message Hub 存在问题。在我们进行修复时,您需要手动创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题(myTopic)一样多的分区,并将保留时间设置为最大值。修复后我会发表另一条评论
非常感谢您的帮助!
java - Kafka Streams 在分组和聚合时使用 KTable 转换为字符串问题
我有一个带有传入消息的 Kafka 流,看起来
sensor_code: x, time: 1526978768, address: Y
我想创建一个 KTable,在每个传感器代码处存储每个唯一地址。
KTable
在哪里kvm1
,kvm2
是我自己的KeyValueMappers
。我的想法是用 , 替换现有的密钥sensor_code=x, address=y
,执行一个groupByKey()
and count()
。然后另一个groupBy(kvm2, Serialized.with(stringSerde, longSerde))
wherekvm2
修改现有的key
以包含 thesensor_code
然后值将是它的计数。但由于它不起作用,也许我做错了......它试图将其转换为 Long 并引发异常,因为它正在寻找一个字符串。我想要计数Long
,对吗?
这是KeyValueMapper
我使用其各自帮助功能的第一个:
这是第二个KeyValueMapper
及其帮助功能。
这是我尝试运行代码时返回的异常和错误。
[2018-05-29 15:28:40,119] 错误流线程 [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] 由于以下错误,无法处理流任务 2_0:(org.apache. kafka.streams.processor.internals.AssignedStreamsTasks:105) java.lang.ClassCastException: java.lang.Long 无法在 org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java :28) 在 org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) 在 org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) 在 org .apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) 在 org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore。put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process( KTableAggregate.java:95) 在 org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
注意java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
错误。
任何想法为什么会出现此错误以及如何修复它或建议如何编辑代码以达到我所提到的所需输出?
提前谢谢了!
编辑: 由于我放弃了其中一种方法,因此对我的问题进行了大修。
ibm-cloud - 创建消息中心服务时出错
创建 IBM Message Hub 服务时出现以下错误。您能帮忙检查导致此错误的原因吗?
java - 如何使用带有消息集线器的 kafkacat
在尝试将 kafkacat 与消息中心一起使用时,我使用了以下内容:
并得到以下错误:
如何使用带有提供凭据的 kerberos 创建正确的密钥,然后使用这些密钥从主题中使用?
message-hub - Message Hub & Confluent Kafka Connect S3
I have requirement to consume messages from IBM MHub topic into IBM Object Storage.
I got it working with local Kafka server with Confluent Kafka Connect S3 plugin as standalone worker for sink Amazon S3 bucket and file. Both was a success.
If I configure Confluent Kafka Connect S3 as distributed worker for IBM MHub cluster I get no errors but still no messages end up to Amazon S3 bucket. I tried file sink also, no luck either.
Is it possible at all?
apache-kafka - 错误:配置属性“security.protocol”的值“sasl_ssl”无效
我正在使用 node-rdkafka 通过以下选项连接到 IBM MessageHub:
客户端在 IBM Kubernetes Service pod (Ubuntu) 上运行。
请指教。谢谢。