2

我有一个 Mapper Pipeline,我以 Google 的 App Engine 上的 log2bq 示例为模型。

细分:

  1. 我将要处理的文件上传到 Blobstore
  2. 我使用 Mapper 管道迭代文件,对其进行一些处理,并使用输出编写器将结果以 CSV 格式写入 Google Storage
  3. 然后我提交一个 BigQuery 作业以使用新创建的文件填充到表中
  4. 我查询表并将查询的输出写入 Blobstore
  5. 用户下载最终结果

这一切都是通过使用许多管道来完成的。我的问题是,如果我在第 4 步之后启用在第 2 步中创建的文件的删除,那么我的 BigQuery 表不会更新。我尝试了两种方法:

  1. 我创建了一个清理管道,在两个主要管道的末尾运行,只删除创建的文件
  2. 我试图在第 4 步结束时删除文件(在同一管道中)

您可以在下面的代码中看到这两种方法都被注释掉了。如果我尝试删除文件,为什么 BigQuery 不更新?我删除它太快了吗?听起来好像是这样,但我很确定运行删除函数的代码只有在输出文件写入 Blobstore 后才会运行。任何帮助将非常感激!

我的主要管道Blobstore2BigQueryMaster如下

class Blobstore2GoogleStorage(base_handler.PipelineBase):
    def run(self, blobkey, bucket_name):
        yield mapreduce_pipeline.MapperPipeline(
            "<NAME>",
            "<MAPPER>",
            "mapreduce.input_readers.BlobstoreLineInputReader",
            "mapreduce.output_writers.FileOutputWriter",
            params={
                "input_reader" : {
                    "blob_keys": blobkey,
                },
                "output_writer": {
                    "filesystem": "gs",
                    "gs_bucket_name": bucket_name,
                    "mime_type": "text/csv",
                }
            },
            shards=24)

class GoogleStorage2BigQuery(base_handler.PipelineBase):
    def run(self, file_names, filekey):
        bq = bigquery.BigQueryApi()
        gspaths = [f.replace('/gs/', 'gs://') for f in file_names]

        result = bq.submit_job(jobData(<TABLE_NAME>, gspaths))
        yield BigQueryImportCheck(result['jobReference']['jobId'], filekey, file_names)


class BigQueryImportCheck(base_handler.PipelineBase):
    def run(self, job, filekey, file_names):
        bq = bigquery.BigQueryApi()
        status = bq.get_job(job_id=job)

        if status['status']['state'] == 'PENDING' or status['status']['state'] == 'RUNNING':
            delay = yield pipeline.common.Delay(seconds=1)
            with pipeline.After(delay):
                yield BigQueryImportCheck(job, filekey, file_names)

        yield BigQueryExport(filekey, file_names)  


class QueryCompletionCheck(base_handler.PipelineBase):
    def run(self, job):  
        bq = bigquery.BigQueryApi()
        status = bq.get_job(job_id=job)
        if status['status']['state'] == 'PENDING' or status['status']['state'] == 'RUNNING':
            delay = yield pipeline.common.Delay(seconds=1)
            with pipeline.After(delay):
                yield QueryCompletionCheck(job)

        yield pipeline.common.Return(status)


class BigQueryExport(base_handler.PipelineBase):
    def run(self, filekey, file_names):        
        bq = bigquery.BigQueryApi()
        #Submit Job to BigQuery Here
        #<REMOVED>

        response = yield QueryCompletionCheck(insertResponse['jobReference']['jobId'])

        #WRITE QUERY RESPONSE TO BLOBSTORE HERE
        #<REMOVED>

        #files.delete(",".join(file_names))
        yield pipeline.common.Return(response)

class Blobstore2BigQueryMaster(base_handler.PipelineBase):
    def run(self, filekey, blobkey, bucket_name):
        file_names = yield Blobstore2GoogleStorage(blobkey, bucket_name)
        synchronize = yield GoogleStorage2BigQuery(file_names, filekey)
        yield CleanupCloudStorage(synchronize, file_names)


class CleanupCloudStorage(base_handler.PipelineBase):
    def run(self, synchronize, file_names):
        #files.delete(",".join(file_names))
        yield pipeline.common.Return('Done')

def jobData(tableId, sourceUris):
    #Configuration for the BigQuery Job here including
    #'createDisposition':'CREATE_IF_NEEDED'
    #'writeDisposition':'WRITE_TRUNCATE'
4

1 回答 1

1

BigQuery 加载作业完成后,您可以安全地从 Google Cloud Storage 中删除 CSV 文件。

我建议检查已完成的作业是否有错误,以查看是否可以揭示导入失败的原因,如果可能,请确认您的导入完成检查不会过早返回。

于 2013-01-18T02:02:35.803 回答