0

我正在使用在 Dataproc 上运行的 spark 2.4 并每 15 分钟运行一次批处理作业以从 bq 表中获取一些数据,将其聚合(总和)并通过 pyspark.sql 将其存储在另一个 bq 表中(覆盖)。

如果我在 spark 中查询表,看起来数据落后了大约一个小时。或者更确切地说,它在大约一个小时前切断。如果我对在 Spark 中查询的表使用完全相同的查询,但是在 BQ Web 控制台中,所有数据都在那里并且是最新的。难道我做错了什么?或者这是连接器的预期行为?

这基本上是我正在使用的代码:

orders_by_hour_query = """
SELECT

_id as app_id,
from_utc_timestamp(DATE_TRUNC('HOUR', created_at), 'America/Los_Angeles') as ts_hour,
SUM(total_price_usd) as gmv,
COUNT(order_id) as orders

FROM `orders`

WHERE DATE(from_utc_timestamp(created_at, 'America/Los_Angeles')) BETWEEN "2020-11-23" AND "2020-11-27"

GROUP BY 1, 2

ORDER BY 1, 2 ASC
"""

orders_df = spark.read.format("bigquery").load(bq_dataset+".orders")
orders_df.createOrReplaceTempView("orders")
orders_by_hour_df = spark.sql(orders_by_hour_query)

编辑:似乎每小时的截止时间似乎几乎是任意的。例如,当前是“2020-11-25 06:31 UTC”,但通过 Spark 连接器从 BQ 查询的最大时间戳是:“2020-11-25 05:56:39 UTC”。

该表的更多信息:

Table size  2.65 GB
Long-term storage size  1.05 GB
Number of rows  4,120,280
Created Jun 3, 2020, 4:56:11 PM
Table expiration    Never
Last modified   Nov 24, 2020, 10:07:54 PM
Data location   US
Table type  Partitioned
Partitioned by  Day
Partitioned on field    created_at
Partition filter    Not required

Streaming buffer statistics

Estimated size  1.01 MB
Estimated rows  1,393
Earliest entry time Nov 24, 2020, 9:57:00 PM

提前致谢!

4

1 回答 1

0

看起来丢失的数据可能在流缓冲区中并且尚未到达 BQ 存储。

这意味着您可以直接从 BQ 查询它,但不能使用 BQ Spark 连接器,因为它适用于 Storage API ( https://cloud.google.com/bigquery/docs/reference/storage )

作为一种解决方法,您可以尝试以下方法。由于只有一个小时的数据,如果数据足够小,您也可以直接使用 BQ API,将 pandas 数据帧转换为 spark 数据帧。

`def bq2df(QUERY):
    bq = bigquery.Client()
    query_job = bq.query(QUERY)
    query_job.result()

    df = spark.read.format('bigquery') \
        .option('dataset', query_job.destination.dataset_id) \
        .load(query_job.destination.table_id) \
        .persist(StorageLevel.MEMORY_AND_DISK)

    return df
于 2020-12-08T03:20:28.660 回答