0

我正在尝试加入

  • KStream:从一个主题创建,该主题具有 JSON 值。我使用来自该值的两个属性重新键入流。示例值(json 的片段)。我创建了一个自定义 pojo 类并使用自定义 serdes。{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}

键映射为:

map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))

我查看了 KStream 并打印了我使用的键和属性。看起来一切都很好。

  • KTable:我从一个主题创建一个 ktable,我正在使用 python 脚本写入主题,主题的关键是KYZ1ifHCInOctets设备名称和指标名称的组合(从上面)。我做了一个 toStream,然后查看生成的流。键和值似乎都很好。

现在,当我进行内部连接并查看或通过/查看某个主题时,我看到键和值不匹配。加入好像不行

  KStream<String, MyPojoClass> joined= datastream.join(table, 
          (data,table)->data
          ,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
          );

key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}

我通过 ksql 有完全相同的事情,但想做我自己的流应用程序。

4

1 回答 1

0

现在听起来很愚蠢,错误是什么,我的 PoJo 类几乎没有静态属性:-(,导致错误的键。

于 2018-10-31T00:06:38.587 回答