4

我正在从 Kafka 主题中获取数据并将它们存储为 Deltalake(parquet) 格式。我希望找到特定日期获取的消息数

我的思考过程:我想使用 spark 读取数据以 parquet 格式存储的目录,并在特定日期对带有“.parquet”的文件应用计数。这会返回一个计数,但我不确定这是否是正确的方法。

这种方式正确吗?有没有其他方法可以计算在特定日期(或持续时间)从 Kafka 主题获取的消息数量?

4

3 回答 3

0

我们从主题消费的消息不仅有键值,还有其他信息,比如时间戳

可用于跟踪消费者流量。

Timestamp Timestamp 由 Broker 或 Producer 根据 Topic 配置更新。如果 Topic 配置的时间戳类型为 CREATE_TIME,则 broker 将使用生产者记录中的时间戳,而如果 Topic 配置为 LOG_APPEND_TIME ,则在附加记录时,时间戳将由 broker 使用 broker 本地时间覆盖。

  1. 因此,如果您要存储任何位置,如果您保留时间戳,则可以很好地跟踪每天或每小时的消息率。

  2. 其他方式,您可以使用一些 Kafka 仪表板,如 Confluent Control Center(许可证价格)或 Grafana(免费)或任何其他工具来跟踪消息流。

  3. 在我们的案例中,在消费消息和存储或处理的同时,我们还将消息的元详细信息路由到 Elastic Search,我们可以通过 Kibana 将其可视化。

于 2019-10-23T22:46:51.630 回答
0

在不计算两个版本之间的行数的情况下检索此信息的另一种方法是使用Delta 表历史记录。这样做有几个优点 - 您无需读取整个数据集,您也可以考虑更新和删除,例如,如果您正在执行 MERGE 操作(无法.count在不同版本上进行比较,因为更新正在替换实际值,或删除该行)。

例如,对于仅追加,以下代码将计算所有由正常append操作写入的插入行(对于其他事情,例如 MERGE/UPDATE/DELETE,我们可能需要查看其他指标):

from delta.tables import *

df = DeltaTable.forName(spark, "ml_versioning.airbnb").history()\
  .filter("timestamp > 'begin_of_day' and timestamp < 'end_of_day'")\
  .selectExpr("cast(nvl(element_at(operationMetrics, 'numOutputRows'), '0') as long) as rows")\
  .groupBy().sum()
于 2021-04-20T09:47:07.243 回答
0

您可以利用 Delta Lake 提供的“时间旅行”功能。

在你的情况下,你可以做

// define location of delta table
val deltaPath = "file:///tmp/delta/table"

// travel back in time to the start and end of the day using the option 'timestampAsOf'
val countStart = spark.read.format("delta").option("timestampAsOf", "2021-04-19 00:00:00").load(deltaPath).count()
val countEnd = spark.read.format("delta").option("timestampAsOf", "2021-04-19 23:59:59").load(deltaPath).count()

// print out the number of messages stored in Delta Table within one day
println(countEnd - countStart)

请参阅有关查询表的旧快照(时间旅行)的文档。

于 2019-10-23T21:58:02.637 回答