0

我想将文件接收到 Google Cloud Storage 存储桶中,并为每个文件只运行一次 Python 作业。我希望同时运行许多这样的 Python 作业,以便并行处理许多文件,但每个文件应该只处理一次。

我考虑了以下几点:

发布/订阅消息

为存储桶上的 OBJECT_FINALIZE 事件生成 Pub/Sub 消息。这里的问题是 Pub/Sub可能会多次传递消息,因此侦听同一订阅的 Python 作业池可能会为同一消息运行多个作业,所以我可以...

  1. 使用 Dataflow 对消息进行重复数据删除,但在我的非流式用例中,dataflow 似乎代价高昂,而且这个答案似乎表明它不是适合这项工作的工具。

或者

  1. 使用事务数据库(例如 Cloud SQL 上的 PostgreSQL)创建锁定机制。任何收到消息的作业都可以尝试获取与文件同名的锁,任何未能获取锁的作业都可以终止并且不确认消息,并且任何具有锁的作业可以继续处理并将锁标记为已完成以防止将来获取该锁。

我认为 2 会起作用,但它也感觉过度设计。

轮询

让作业轮询存储桶中的新文件,而不是使用 Pub/Sub。

这感觉就像它只是用一个仍然需要锁定机制的不太健壮的解决方案替换 Pub/Sub。

事件弧

使用Eventarc触发保存我的代码的 Cloud Run 容器。这似乎类似于 Pub/Sub,而且更简单,但我找不到 Eventarc 如何处理重试之类的事情的解释,或者它是否带有任何一次性保证。

单个控制器产生多个工人

创建一个中央控制器进程来处理文件事件的重复数据删除(通过 Pub/Sub、轮询或 Eventarc 接收),然后生成工作作业并将每个文件准确地分配给工作作业一次。

我认为这也可行,但会产生单点故障并可能造成吞吐量瓶颈。

4

1 回答 1

0

您走在正确的轨道上,是的,PubSub Push 消息可能会多次传递。

一种简单的管理技术是在开始处理文件时重命名文件。重命名是一个原子事务,所以如果它成功了,你就可以处理它。

    PROC_PRF = "processing"
    bucketName = # get it from the message
    fileName = # Get it from the message)

    # Renaming of the file below trriggers another google.storage.object.finalize event
    if PROC_PRF in fileName:
        print("Exiting due to rename event")
        # Ack the message an exit
        return

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketName)
    blob = bucket.get_blob(fileName)

    try:
        newBlob = bucket.rename_blob(blob,new_name = fileName+'.'+PROC_PRF)
    except:
        raise RuntimeError("Error: File rename from " + fileName + " failed, is this a duplicate function call?")

    # The rename worked - process the file & message
于 2021-09-13T21:40:04.887 回答