我有一个带有传入消息的 Kafka 流,看起来
sensor_code: x, time: 1526978768, address: Y
我想创建一个 KTable,在每个传感器代码处存储每个唯一地址。
KTable
KTable<String, Long> numCount = streams
.map(kvm1)
.groupByKey(Serialized.with(stringSerde, stringSerde))
.count()
.groupBy(kvm2, Serialized.with(stringSerde, longSerde))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));
在哪里kvm1
,kvm2
是我自己的KeyValueMappers
。我的想法是用 , 替换现有的密钥sensor_code=x, address=y
,执行一个groupByKey()
and count()
。然后另一个groupBy(kvm2, Serialized.with(stringSerde, longSerde))
wherekvm2
修改现有的key
以包含 thesensor_code
然后值将是它的计数。但由于它不起作用,也许我做错了......它试图将其转换为 Long 并引发异常,因为它正在寻找一个字符串。我想要计数Long
,对吗?
这是KeyValueMapper
我使用其各自帮助功能的第一个:
private static String getKeySensorIdAddress(String o) {
String x = "sensor_id=\"x\", address=\"y\"";
try {
WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
x = x.replace("x", event.getSensor_code());
x = x.replace("y", event.getAddress());
return x;
} catch(Exception ex) {
System.out.println("Error... " + ex);
return "Error";
}
}
//KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 =
new KeyValueMapper<String, String, KeyValue<String, String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(getKeySensorIdAddress(value), value);
}
};
这是第二个KeyValueMapper
及其帮助功能。
private static String getKeySensorId(String o) {
int a = o.indexOf(",");
return o.substring(0,a);
}
//KeyValueMapper2
KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 =
new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
public KeyValue<String, Long> apply(String key, Long value) {
return new KeyValue<>(getKeySensorId(key), value);
}
};
这是我尝试运行代码时返回的异常和错误。
[2018-05-29 15:28:40,119] 错误流线程 [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] 由于以下错误,无法处理流任务 2_0:(org.apache. kafka.streams.processor.internals.AssignedStreamsTasks:105) java.lang.ClassCastException: java.lang.Long 无法在 org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java :28) 在 org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) 在 org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) 在 org .apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) 在 org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore。put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process( KTableAggregate.java:95) 在 org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
注意java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
错误。
任何想法为什么会出现此错误以及如何修复它或建议如何编辑代码以达到我所提到的所需输出?
提前谢谢了!
编辑: 由于我放弃了其中一种方法,因此对我的问题进行了大修。