1

我有一个 kafka 主题,主题中的每条消息都有纬度/经度和事件时间戳。创建了一个引用主题的流,并希望使用 geo_distance 计算 2 点之间的距离。例子

GpsDateTime            lat              lon
2016-11-30 22:38:36,    32.685757,  -96.735942
2016-11-30 22:39:07,    32.687347,  -96.732841
2016-11-30 22:39:37,    32.68805,   -96.729726 

我想在上面的流上创建一个新流并用距离丰富它。

GpsDateTime            lat              lon          Distance
2016-11-30 22:38:36,    32.685757,  -96.735942        0
2016-11-30 22:39:07,    32.687347,  -96.732841        0.340
2016-11-30 22:39:37,    32.68805,   -96.729726        0.302

使用 KSQL 是否可以达到预期的效果?或者如何在处理新消息时参考以前的消息?

4

1 回答 1

0

首先,这些读数是否来自某种设备?如果是这样,您是否有他们的唯一 ID (UUID)?我会把它放到你的流中,所以它会喜欢UUID, GpsDateTime, lat, lon.

您将需要创建一个相当基本的 Kafka Streams 应用程序。在这个应用程序中,您将从流中的最新读数存储到 StoreBuilder 中。然后,当从 Kafka 接收到新消息时,您将检索这个最新值,进行计算,然后将新的 lat,long 值存储到 StoreBuilder 中。

当然,我不清楚您是否只想拥有一个 lat,long 值,并且所有后续值都是从第一次读数中计算出来的。或者,如果您想进行滚动计算,您总是在比较上次和当前读数之间的距离。

无论如何,您可以在实践中看到此代码:https ://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest .java

此示例是一个字数统计示例,但可以根据您的用例快速转换。

静态最终类 WordCountTransformerSupplier(第 78 行)将成为您的 LatLongDistanceComputation。

您将创建具有适当类型的 StoreBuilder(第 154 行)(无论您将纬度/经度存储为什么)。

第 165 行是从流入的值流中实际读取项目的位置。

当然,您还需要编辑 inputTopic 和 outputTopic(第 66-67 行)以及其他一些内容。

于 2018-09-14T03:24:34.167 回答