0

我使用 AWS 提供的 java 流框架构建了一个工作流。我创建了 4 个活动。第一个活动等待信号开始。然后所有活动使用 Promise<> 对象同步执行。工作流程实现代码如下 -

public class PaginationWorkflowImpl implements PaginationWorkflow 
{  
private ManualUploadClient operations0 = new ManualUploadClientImpl();
   private DownloadActivityClient operations1 = new DownloadActivityClientImpl();
   private ConvertActivityClient operations2 = new ConvertActivityClientImpl();
   private UploadActivityClient operations3 = new UploadActivityClientImpl();
   final Settable<String> result = new Settable<String>();

   public void paginate() 
   {
     Promise<String> UDone = operations0.Upload(result);
     Promise<String> dnDone = operations1.s3Download(UDone);
     Promise<String> convDone = operations2.pdfToTiff(dnDone);
     operations3.s3Upload(convDone);
   }

   @Override
   public void signal1(String data)  {
      // result.set(data);
       //result.Void();
       Promise<String> ready = Promise.asPromise("ready");
       result.chain(ready);
   }


}

这里活动上传等待对象结果处于就绪状态。因此,当我向工作流发出信号时,方法 signal1 启动并将对象置于就绪状态。但是,一旦我向工作流发出信号,工作流执行就会失败。

我正在使用 nodejs aws api 来表示工作流。以下是相同的代码 -

var AWS = require('aws-sdk');
AWS.config.update({accessKeyId: '', secretAccessKey: ''});
AWS.config.update({region: 'us-east-1'});

var swf = new AWS.SWF();
var params = {
  domain: 'HWdemo2', /* required */
  signalName: 'signal1', /* required */
  workflowId: 'PaginationWorkflow', /* required */
  //input: 'true'
  //runId: 'STRING_VALUE'
};
swf.signalWorkflowExecution(params, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

我在 AWS 工作流事件控制台中显示的执行失败错误如下 -

["java.util.concurrent.CancellationException", {
  "cause": ["java.lang.NullPointerException", {
    "cause": null,
    "stackTrace": [{
      "methodName": "<init>",
      "fileName": null,
      "lineNumber": -1,
      "className": "java.io.StringReader",
      "nativeMethod": false
    }, {
      "methodName": "createParser",
      "fileName": "JsonFactory.java",
      "lineNumber": 835,
      "className": "com.fasterxml.jackson.core.JsonFactory",
      "nativeMethod": false
    }, {
      "methodName": "readValue",
      "fileName": "ObjectMapper.java",
      "lineNumber": 2098,
      "className": "com.fasterxml.jackson.databind.ObjectMapper",
      "nativeMethod": false
    }, {
      "methodName": "fromData",
      "fileName": "JsonDataConverter.java",
      "lineNumber": 96,
      "className": "com.amazonaws.services.simpleworkflow.flow.JsonDataConverter",
      "nativeMethod": false
    }, {
      "methodName": "signalRecieved",
      "fileName": "POJOWorkflowDefinition.java",
      "lineNumber": 111,
      "className": "com.amazonaws.services.simpleworkflow.flow.pojo.POJOWorkflowDefinition",
      "nativeMethod": false
    }, {
      "methodName": "doExecute",
      "fileName": "AsyncDecider.java",
      "lineNumber": 417,
      "className": "com.amazonaws.services.simpleworkflow.flow.worker.AsyncDecider$1",
      "nativeMethod": false
    }, {
      "methodName": "",
      "fileName": "",
      "lineNumber": 0,
      "className": "--- continuation ---",
      "nativeMethod": false
    }, {
      "methodName": "handleWorkflowExecutionSignaled",
      "fileName": "AsyncDecider.java",
      "lineNumber": 413,
      "className": "com.amazonaws.services.simpleworkflow.flow.worker.AsyncDecider",
      "nativeMethod": false
    }, {
      "methodName": "processEvent",
      "fileName": "AsyncDecider.java",
      "lineNumber": 251,
      "className": "com.amazonaws.services.simpleworkflow.flow.worker.AsyncDecider",
      "nativeMethod": false
    }, {
      "methodName": "decide",
      "fileName": "AsyncDecider.java",
      "lineNumber": 496,
      "className": "com.amazonaws.services.simpleworkflow.flow.worker.AsyncDecider",
      "nativeMethod": false
    }, {
      "methodName": "handleDecisionTask",
      "fileName": "AsyncDecisionTaskHandler.java",
      "lineNumber": 50,
      "className": "com.amazonaws.services.simpleworkflow.flow.worker.AsyncDecisionTaskHandler",
      "nativeMethod": false
    }, {
      "methodName": "pollAndProcessSingleTask",
      "fileName": "DecisionTaskPoller.java",
      "lineNumber": 201,
      "className": "com.amazonaws.services.simpleworkflow.flow.worker.DecisionTaskPoller",
      "nativeMethod": false
    }, {
      "methodName": "run",
      "fileName": "GenericWorker.java",
      "lineNumber": 94,
      "className": "com.amazonaws.services.simpleworkflow.flow.worker.GenericWorker$PollServiceTask",
      "nativeMethod": false
    }, {
      "methodName": "runWorker",
      "fileName": null,
      "lineNumber": -1,
      "className": "java.util.concurrent.ThreadPoolExecutor",
      "nativeMethod": false
    }, {
      "methodName": "run",
      "fileName": null,
      "lineNumber": -1,
      "className": "java.util.concurrent.ThreadPoolExecutor$Worker",
      "nativeMethod": false
    }, {
      "methodName": "run",
      "fileName": null,
      "lineNumber": -1,
      "className": "java.lang.Thread",
      "nativeMethod": false
    }],
    "message": null,
    "localizedMessage": null,
    "suppressed": ["[Ljava.lang.Throwable;", []]
  }],
  "stackTrace": [{
    "methodName": "execute",
    "fileName": "POJOWorkflowDefinition.java",
    "lineNumber": 66,
    "className": "com.amazonaws.services.simpleworkflow.flow.pojo.POJOWorkflowDefinition",
    "nativeMethod": false
  }, {
    "methodName": "doAsync",
    "fileName": "AsyncDecider.java",
    "lineNumber": 70,
    "className": "com.amazonaws.services.simpleworkflow.flow.worker.AsyncDecider$WorkflowExecuteAsyncScope",
    "nativeMethod": false
  }],
  "message": null,
  "localizedMessage": null,
  "suppressed": ["[Ljava.lang.Throwable;", []]
}]

谁能帮我解决这个错误,非常感谢。

4

2 回答 2

0

默认情况下,如果无法反序列化信号参数,工作流执行将失败。默认情况下, JsonDataConverter由在Jackson之上实现的工作流决策者使用。它需要特定格式的 JSON 文档。学习这种格式的最简单方法是使用生成的 Java 接口发送信号并查看工作流历史记录。然后你可以在你的 JavaScript 代码中重现它。

于 2015-03-30T18:30:45.907 回答
0

工作流需要一个对象数组的输入,这些对象可以转换为 json 格式的字符串。

于 2015-03-31T02:26:24.410 回答