1
16:37:21.945 [Workflow Executor taskList="PullFulfillmentsTaskList", domain="test-domain": 3] WARN com.uber.cadence.internal.common.Retryer - Retrying after failure
org.apache.thrift.transport.TTransportException: Request timeout after 1993ms
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.throwOnRpcError(WorkflowServiceTChannel.java:546)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.doRemoteCall(WorkflowServiceTChannel.java:519)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.respondDecisionTaskCompleted(WorkflowServiceTChannel.java:962)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.lambda$RespondDecisionTaskCompleted$11(WorkflowServiceTChannel.java:951)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.measureRemoteCall(WorkflowServiceTChannel.java:569)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.RespondDecisionTaskCompleted(WorkflowServiceTChannel.java:949)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.lambda$sendReply$0(WorkflowWorker.java:301)
    at com.uber.cadence.internal.common.Retryer.lambda$retry$0(Retryer.java:104)
    at com.uber.cadence.internal.common.Retryer.retryWithResult(Retryer.java:122)
    at com.uber.cadence.internal.common.Retryer.retry(Retryer.java:101)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.sendReply(WorkflowWorker.java:301)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:261)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:229)
    at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:71)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我们的父工作流代码基本上是这样的(JSONObject来自org.json)

JSONObject[] array = restActivities.getArrayWithHugeJSONItems();
for(JSONObject hugeJSON: array) {
  ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class);
  child.run(hugeJSON);
}

我们发现,大多数时候,父工作流工作者无法启动子工作流并抛出上面的超时异常。它疯狂地重试,但从未成功,并一遍又一遍地打印超时异常。然而,有时我们很幸运并且它有效。有时它甚至更早地在活动工作者中失败,并引发相同的异常。我们认为这是由于数据量太大(约 5MB),无法在超时时间内发送(从日志判断我们猜测设置为 2s)。如果我们child.run用小的假数据调用它,它 100% 有效。

我们使用子工作流的原因是我们想用来Async.function并行运行它们。那么我们该如何解决这个问题呢?是否有我们应该增加的节俭超时配置,或者我们可以避免传递大量数据?

先感谢您!

---Maxim回答后更新---

谢谢你。我阅读了示例,但对于我的用例仍有一些问题。假设我在我的 RestActivitiesWorker 中有一个包含 100 个巨大 JSON 对象的数组,如果我不应该将这个巨大的数组返回到工作流,我需要对数据库进行 100 次调用以创建 100 行记录并将 100 个 ID 放入一个数组中并且将其传递回工作流程。然后工作流为每个 ID 创建一个子工作流。然后每个子工作流调用另一个具有 id 的活动以从数据库加载数据。但是该活动必须将那个巨大的 JSON 传递给子工作流,这样可以吗?对于在数据库中进行 100 次插入的 RestActivitiesWorker,如果它在中间失败了怎么办?

我想这归结为我们的工作流程试图直接使用巨大的 JSON。我们正在尝试将巨大的 JSON(5-30MB,不是那么大)从外部系统加载到我们的系统中。我们稍微分解一下 JSON,操作一些值,并使用来自几个字段的值来执行一些不同的逻辑,最后将其保存在我们的 DB 中。我们应该如何使用 Temporal 来做到这一点?

4

1 回答 1

1

Temporal/Cadence 不支持将大 blob 作为输入和输出传递,因为它使用数据库作为底层存储。所以你想改变你的应用程序的架构来避免这种情况。

标准的解决方法是:

  • 使用外部 blob 存储来保存大数据并将引用作为参数传递。
  • 在工作进程甚至主机磁盘上缓存数据,并将对这些数据进行操作的活动路由到该进程或主机。有关此方法,请参阅文件处理示例。
于 2020-06-19T01:27:18.560 回答