0

我想在将 CSV 文件上传到 blob 存储时对其进行处理。对于这个要求,我正在编写带有 blob 触发器的 Web 作业。

为了确保连续的 CSV 处理,我正在编写另一个带有 blob 触发器的 Web 作业。

因此,如果一个 Web 作业失败,另一个 Web 作业将处理 csv。

现在,我的问题是当两个 Web 作业都在运行时,它们正在处理相同的 CSV 文件并最终创建重复数据。

我如何锁定文件以便只有一项网络作业会处理 CSV 文件?

或者

如果第一个 Web 作业要关闭,我如何触发第二个 Web 作业?

4

2 回答 2

2

如果第一个 Web 作业要关闭,我如何触发第二个 Web 作业?

我建议您使用 try-catch 来处理您的第一个 WebJob 中的异常。如果发生任何异常,我们可以将 blob 名称写入队列以触发其他 WebJob。

public static void ProcessCSVFile([BlobTrigger("input/{blobname}.csv")] TextReader input, [Queue("myqueue")] out string outputBlobName, string blobname)
{
    try
    {
        //process the csv file

        //if none exception occurs, set the value of outputBlobName to null
        outputBlobName = null;
    }
    catch
    {
        //add the blob name to a queue and another function named RepeatProcessCSVFile will be triggered.
        outputBlobName = blobname;
    }
}

我们可以在另一个 WebJob 中创建一个 QueueTrigger 函数。在这个函数中,我们可以读出 blob 名称并重新处理 csv。如果出现新的异常,我们还可以将 blob 名称重新添加到队列中,并且该函数将一次又一次地执行,直到 csv 文件被成功处理。

public static void RepeatProcessCSVFile([QueueTrigger("myqueue")] string blobName, [Queue("myqueue")] out string outputBlobName)
{
    try
    {
        //process the csv file

        //if none exception occurs, set the value of outputBlobName to null.
        outputBlobName = null;
    }
    catch
    {
        //re-add the blobName to the queue and this function will be executed again until the csv file has been handled successfully.
        outputBlobName = blobName;
    }
}
于 2017-05-23T04:24:51.147 回答
0

我喜欢 Amor 的解决方案,但有一些建议可以添加。

如果放弃 BlobTrigger 方法,而是将指示需要处理的 blob 的服务总线队列消息排入队列,则可以使用 ServiceBusTrigger 触发处理。如果发生异常,请放弃该消息,它将可用于另一次处理尝试。这将使您只有一个网络作业并且仍然有冗余。

使用服务总线队列的另一个优点是,您可以保证至少一次且最多一次处理以及在读取消息时保证消息锁定。这不是标准存储队列的情况。如果您想添加第二个 Webjob 实例来监控相同的服务总线队列,这也将在未来为您提供可扩展性选项。

于 2017-05-24T10:24:36.783 回答