3

我正在尝试使用Node.jsAzure 函数中向我的Azure 存储队列添加大约 6000 条消息。

我尝试了多种方法来做到这一点,现在我将 QueueService 方法包装在 a 中,并使用Bluebird以大约 50 的并发性Promise通过 a 解决 6000 个承诺。Promise.map

const addMessages = Promise.map(messages, (msg) => {
  //returns a promise wrapping the Azure QueueService method
  return myQueueService.addMessage(msg);
}, { concurrency: 50 });

//this returns a promise that resolves when all promises have resolved.
//it rejects when one of the promises have rejected.
addMessages.then((results) => {
  console.log("SUCCESS");
}, (error) => {
  console.log("ERROR");
});

我的 QueueService 是使用ExponentialRetry策略创建的。


使用这种策略我得到了好坏参半的结果:

  • 所有消息都被添加到我的队列中,并且承诺正确解决。
  • 所有消息都被添加到我的队列中,并且承诺没有解决(或拒绝)。
  • 并非所有消息都添加到我的队列中,并且承诺不会解决(或拒绝)。

我是否遗漏了什么,或者我的电话有时需要 2 分钟才能解决,有时需要 10 分钟以上?

将来,我可能要添加大约 100.000 条消息,所以我有点担心我现在得到的不可预测的结果。

在节点(在 Azure 函数中)添加大量消息的最佳策略是什么?


编辑:

不知道我是怎么错过的,但是将消息添加到我的存储队列的一种非常可靠的方法是使用我的 Azure 函数的队列输出绑定:

https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-storage-queue#storage-queue-output-binding

也使我的代码更容易!

for (var i = 0; i < messages.length; i++) {
  // add each message to queue
  context.bindings.outputQueue.push(messages[i]);
}

编辑2:

我打算将我的消息分成大约 1000 条,并将这些批次存储在Azure Blob Storage中。

每次添加新 blob 时都可以触发另一个 Azure 函数,该函数将一次处理 1000 条消息的排队。

这应该使我的队列更加可靠和可扩展,因为我尝试通过输出绑定将 20.000 条消息添加到我的队列中,并在 5 分钟后收到 Azure 函数超时,而只能处理大约 15.000 条消息。

4

1 回答 1

2

是什么触发了这个功能?我建议不要让单个函数添加所有这些消息,而是扇出并允许这些函数扩展并通过限制它们正在执行的工作量来更好地利用并发性。

根据我上面的建议,您将拥有处理您今天使用的触发器的函数,将工作排队,然后由另一个函数处理,该函数执行添加(少得多)数量的消息的实际工作到队列。您可能需要根据您的工作负载使用数字来查看哪些方法运行良好,但这种模式将允许这些功能更好地扩展(包括跨多台机器),更好地处理故障并提高可靠性和可预测性。

例如,您可以将消息中的消息数量放在队列中以触发工作,如果您想要 1000 条消息作为最终输出,您可以将 10 条消息排队,指示您的“工作”函数每条添加 100 条消息。我还建议每个函数使用更小的数字。

我希望这有帮助!

于 2016-11-16T18:32:03.250 回答