我正在尝试加入
- KStream:从一个主题创建,该主题具有 JSON 值。我使用来自该值的两个属性重新键入流。示例值(json 的片段)。我创建了一个自定义 pojo 类并使用自定义 serdes。
{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}
键映射为:
map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))
我查看了 KStream 并打印了我使用的键和属性。看起来一切都很好。
- KTable:我从一个主题创建一个 ktable,我正在使用 python 脚本写入主题,主题的关键是
KYZ1ifHCInOctets
设备名称和指标名称的组合(从上面)。我做了一个 toStream,然后查看生成的流。键和值似乎都很好。
现在,当我进行内部连接并查看或通过/查看某个主题时,我看到键和值不匹配。加入好像不行
KStream<String, MyPojoClass> joined= datastream.join(table,
(data,table)->data
,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
);
key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}
我通过 ksql 有完全相同的事情,但想做我自己的流应用程序。