首先,你应该坚持HTTP 协议POST
,只通过操作触发后台任务。您可能不希望爬虫通过GET
请求触发长时间运行的后台进程,对吗?
因此,您还应该使用Location
HTTP 标头来返回资源的 URI,从而可以从中检索有关进程当前状态的更多信息。我也会使用一个通用的 URI,而不是一些重定向。
在您的路线设置中,我通常也将所有路线相关的东西都保留在.route()
块中。由于德国法律强制客户备份他们交换的 EDI 消息,我们维护一个目录组装流程和一个 EDI 消息存档系统,该系统将组装在特定时间范围内发送和/或接收的消息。
我们将触发新的归档或组装请求与检索请求的当前状态分开。
rest("/archives")
.post()
.bindingMode(RestBindingMode.json)
.type(ArchiveRequestSettings.class)
.consumes(MediaType.APPLICATION_JSON)
.produces(MediaType.APPLICATION_JSON)
.description("Invokes the generation of a new message archive for
"messages matching a criteria contained in the payload")
.route().routeId("create-archives")
// Extract the IP address of the user who invokes the service
.bean(ExtractClientIP.class)
// Basic Authentication
.bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
// check the amount of requests received within a certain time-period
.bean(receivedRequestFilter)
// extract specified settings
.bean(ExtractArchiveRequestSettings.class)
// forward the task to the archive generation queue
.to(SomeEndpoints.ARCHIVE_GENERATION_QUEUE)
// return 202 Accepted response
.bean(ReturnArchiveRequestCreatedStatus.class)
.endRest()
.get("/{archiveId}")
.bindingMode(RestBindingMode.json)
.outType(ArchiveRequestEntity.class)
.produces(MediaType.APPLICATION_JSON)
.description("Returns the status of the message archive generation process."
+ " If the process has finished this operation will return the"
+ " link to the download location of the generated archive")
.route().routeId("archive-status")
// Extract the IP address of the user who invokes the service
.bean(ExtractClientIP.class)
// Basic Authentication
.bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
// check the amount of requests received within a certain time-period
.bean(receivedRequestFilter)
// return the current state of the task to the client. If the job is done,
// the response will also include a download link as wel as an MD5 hash to
// verify the correctness of the downloaded archive
.bean(ReturnArchiveRequestStatus.class)
.endRest();
该类ExtractArchiveRequestSettings
仅对接收到的有效负载执行完整性检查,并为缺少的字段设置默认值。然后将请求存储到数据库中,并将其唯一标识符存储到标头中。
ArchiveRequestSetting 看起来像下面的示例(稍微简化)
@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ArchiveRequestSettings {
/** Specifies if sent or received messages should be included in the artifact. Setting this field
* to 'DELIVERED' will include only delivered documents where the companyUuid of the requesting
* user matches the documents sender identifier. Specifying this field as 'RECEIVED' will include
* only documents whose receiver identifier matches the companyUuid of the requesting user. **/
private String direction;
/** The naming schema of entries within the archive **/
private String entryPattern;
/** The upper timestamp bound to include messages. Entries older than this value will be omitted **/
@JsonSerialize(using = Iso8601DateSerializer.class)
@JsonDeserialize(using = Iso8601DateDeserializer.class)
private Date from;
/** The lower timestamp bound to include messages. Entries younger than this value will be
* omitted. If left empty this will include even the most recent messages. **/
@JsonSerialize(using = Iso8601DateSerializer.class)
@JsonDeserialize(using = Iso8601DateDeserializer.class)
private Date till;
}
该类ReturnArchiveRequestCreatedStatus
查找存储的请求实体并将其与202 Accepted
响应一起返回。
@Handler
public void returnStatus(Exchange exchange) {
String archiveId = exchange.getIn().getHeader(HeaderConstants.ARCHIVES_REQUEST_ID, String.class);
ArchiveRequestEntity archive = repository.findOne(archiveId);
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 202); // Accepted
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setHeader("Location", archiveLocationUrl + "/" + archiveId);
msg.setBody(archive);
exchange.setOut(msg);
}
返回存储请求的当前状态可确保客户端可以检查实际应用了哪些设置,并在某些默认设置不方便或需要应用进一步更改时更新它们。
实际的支持过程是通过将交换发送到 Redis 队列开始的,该队列在不同的机器上使用。此过程的输出将是一个包含所请求文件的存档,该文件被上传到一个公共可访问的位置,并且只有链接将存储在请求实体中。请注意,我们有一个自定义的骆驼组件,它为 Redis 队列模拟了一个 seda 端点。使用seda
虽然应该足以在不同的线程中开始处理任务。
根据支持进程的当前状态,存储的请求实体将由支持进程更新。在接收到状态请求(通过GET
)时,会查询数据存储的当前状态并映射到某些响应:
public class ReturnArchiveRequestStatus {
@Resource
private ArchiveRequestRepository repository;
@Handler
public void returnArchiveStatus(Exchange exchange) throws JSONException {
String archiveId = exchange.getIn().getHeader("archiveId", String.class);
if (StringUtils.isBlank(archiveId)) {
badRequest(exchange);
return;
}
ArchiveRequestEntity archive = repository.findOne(archiveId);
if (null == archive) {
notFound(archiveId, exchange);
return;
}
ok(archive, exchange);
}
private void badRequest(Exchange exchange) throws JSONException {
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setFault(false);
JSONObject json = new JSONObject();
json.put("status", "ERROR");
json.put("message", "No archive identifier found");
msg.setBody(json.toString());
exchange.setOut(msg);
}
private void notFound(String archiveId, Exchange exchange) throws JSONException {
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 403);
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setFault(false);
JSONObject json = new JSONObject();
json.put("status", "ERROR");
json.put("message", "Could not find pending archive process with ID " + archiveId);
msg.setBody(json.toString());
exchange.setOut(msg);
}
private void ok(UserArchiveRequestEntity archive, Exchange exchange) throws JSONException {
Message msg = new DefaultMessage(exchange.getContext());
msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
msg.setFault(false);
msg.setBody(archive);
exchange.setOut(msg);
}
}
在整个过程中存储和更新的实际实体看起来大致相同(简化):
@Getter
@Setter
@Builder
@ToString
@Document(collection = "archive")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ArchiveRequestEntity {
/**
* The current state of the archiving process
*/
public enum State {
/** The request to create an archive was cued but not yet processed **/
QUEUED,
/** The archive is currently under construction **/
RUNNING,
/** The archive was generated successfully. {@link #downloadUrl} should contain the link the
* archive can be found **/
FINISHED,
/** Indicates that the archive generation failed. {@link #error} should indicate the actual
* reason why the request failed **/
FAILED
}
@Id
@JsonIgnore
private String id;
/** Timestamp the process was triggered **/
@JsonIgnore
@Indexed(expireAfterSeconds = DEFAULT_EXPIRE_TIME)
private Date timestamp = new Date();
/** The identifier of the company to create the archive for **/
private String companyUuid;
/** The state this archive is currently in **/
private State state = State.QUEUED;
...
/** Marks the upper limit to include entries to the archive. Entries older then this field will
* not be included in the archives while entries equal or younger than this timestamp will be
* included unless they are younger than {@link #till} timestamp **/
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
private Date from;
/** Marks the lower limit to include entries to the archive. Entries younger than this field will
* not be included in the archive **/
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
private Date till;
/** Information on why the archive creation failed **/
private String error;
/** The URL of the final archive to download **/
private String downloadUrl;
/** The MD5 Hash of the final artifact in order to guarantee clients an unmodified version of the
* archive **/
private String md5Hash;
...
}
请注意,无论处理状态的当前状态如何,都将200 OK
使用进程状态的当前 JSON 表示形式返回。客户端将看到FINISHED
具有downloadUrl
和md5Hash
属性设置的状态或具有不同可用属性的不同状态。
当然,支持过程需要适当地更新请求状态,否则客户端将无法检索有关请求当前状态的正确信息。
这种方法应该适用于几乎所有长时间运行的进程,尽管您传递的信息的内部可能与我们的场景不同。希望这会有所帮助