1

我试图找到此类问题的最新示例,但不幸的是没有找到。我正在尝试使用骆驼实现 Web 服务,其行为应如下所示:

  • Camel 通过 GET 或 POST (api/startsearch) 从 Rest-Endpoint 接收输入
  • 一个bean处理输入并生成一个ticket-id
  • 同一个 bean 使用 HTTP-202 或包含重定向 url (api/result?ticket-id=jf3298u23) 的重定向状态代码响应客户端。
  • bean 还将输入传递给 activemq 启动队列,Camel 路由将在其中完成所有长操作处理。
  • 路由完成后,结果应该在重定向 URL (/result?ticket-id=jf3298u23) 中可用。如果处理尚未完成,它应该使用自定义状态代码(如 HTTP-299-processing)进行响应。

所以我的路线是这样的:

rest().path(apiPath).produces("application/json")
            .get(searchEndpoint)
            .to("bean:requestHandler?method=processNewSearch") // generate ticket-id and reply with 202 or 3xx
            .route().inOnly("activemq:queue:start").endRest() // put the incoming message into the start-queue where the processing starts
            .get(resultEndpoint).to("bean:requestHandler?method=returnResult"); // return 299 when processing not done or 200 + result

from("activemq:queue:start")
            .setHeader("recipients").method(new ExtractRecipients(), "extractRecipients")
            .to("activemq:queue:recipientlist");

... etc, etc... until:

from("activemq:queue:output")
            .to("bean:requestHandler?method=saveFinishedSearch");

bean 本身有三种方法:

public void processNewSearch(Exchange exchange) {
    //generate ticket and stuff and finally set Header and body of the response

    exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 202);
    exchange.getOut().setBody(redirectUrl);
}

public void returnResult(Exchange exchange) {
    //handle ticket related stuff, if valid fetch result and create http response:
        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
        exchange.getOut().setBody(searchResult);
        return;
}

public void saveFinishedSearch(Exchange exchange) {
    // get search Results from the final Message that was processing asynchronously in the background and save it
    finishedSearches.put(ticket, body);
}

我确信这不是使用手动设置的响应代码和消息进行回复的正确方法,但我没有找到另一种方法。

所以目前的问题是骆驼等待整个消息被处理,因此生成的响应.to("bean:requestHandler?method=processNewSearch")什么也不做,因为它只会被放入启动队列。

如何立即使用骆驼返回自定义响应并让路由异步处理请求?

4

1 回答 1

1

首先,你应该坚持HTTP 协议POST,只通过操作触发后台任务。您可能不希望爬虫通过GET请求触发长时间运行的后台进程,对吗?

因此,您还应该使用LocationHTTP 标头来返回资源的 URI,从而可以从中检索有关进程当前状态的更多信息。我也会使用一个通用的 URI,而不是一些重定向。

在您的路线设置中,我通常也将所有路线相关的东西都保留在.route()块中。由于德国法律强制客户备份他们交换的 EDI 消息,我们维护一个目录组装流程和一个 EDI 消息存档系统,该系统将组装在特定时间范围内发送和/或接收的消息。

我们将触发新的归档或组装请求与检索请求的当前状态分开。

rest("/archives")
  .post()
    .bindingMode(RestBindingMode.json)
    .type(ArchiveRequestSettings.class)
    .consumes(MediaType.APPLICATION_JSON)
    .produces(MediaType.APPLICATION_JSON)
    .description("Invokes the generation of a new message archive for 
                 "messages matching a criteria contained in the payload")

    .route().routeId("create-archives")
      // Extract the IP address of the user who invokes the service
      .bean(ExtractClientIP.class)
      // Basic Authentication
      .bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
      // check the amount of requests received within a certain time-period
      .bean(receivedRequestFilter)
      // extract specified settings
      .bean(ExtractArchiveRequestSettings.class)
      // forward the task to the archive generation queue
      .to(SomeEndpoints.ARCHIVE_GENERATION_QUEUE)
      // return 202 Accepted response
      .bean(ReturnArchiveRequestCreatedStatus.class)
    .endRest()

  .get("/{archiveId}")
    .bindingMode(RestBindingMode.json)
    .outType(ArchiveRequestEntity.class)
    .produces(MediaType.APPLICATION_JSON)
    .description("Returns the status of the message archive generation process." 
                 + " If the process has finished this operation will return the"
                 + " link to the download location of the generated archive")

    .route().routeId("archive-status")
      // Extract the IP address of the user who invokes the service
      .bean(ExtractClientIP.class)
      // Basic Authentication
      .bean(SpringSecurityContextLoader.class).policy(authorizationPolicy)
      // check the amount of requests received within a certain time-period
      .bean(receivedRequestFilter)
      // return the current state of the task to the client. If the job is done, 
      // the response will also include a download link as wel as an MD5 hash to
      // verify the correctness of the downloaded archive
      .bean(ReturnArchiveRequestStatus.class)
    .endRest();

该类ExtractArchiveRequestSettings仅对接收到的有效负载执行完整性检查,并为缺少的字段设置默认值。然后将请求存储到数据库中,并将其唯一标识符存储到标头中。

ArchiveRequestSetting 看起来像下面的示例(稍微简化)

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ArchiveRequestSettings {

  /** Specifies if sent or received messages should be included in the artifact. Setting this field
   * to 'DELIVERED' will include only delivered documents where the companyUuid of the requesting
   * user matches the documents sender identifier. Specifying this field as 'RECEIVED' will include
   * only documents whose receiver identifier matches the companyUuid of the requesting user. **/
  private String direction;

  /** The naming schema of entries within the archive **/
  private String entryPattern;

  /** The upper timestamp bound to include messages. Entries older than this value will be omitted **/
  @JsonSerialize(using = Iso8601DateSerializer.class)
  @JsonDeserialize(using = Iso8601DateDeserializer.class)
  private Date from;
  /** The lower timestamp bound to include messages. Entries younger than this value will be
   * omitted. If left empty this will include even the most recent messages. **/
  @JsonSerialize(using = Iso8601DateSerializer.class)
  @JsonDeserialize(using = Iso8601DateDeserializer.class)
  private Date till;
}

该类ReturnArchiveRequestCreatedStatus查找存储的请求实体并将其与202 Accepted响应一起返回。

@Handler
public void returnStatus(Exchange exchange) {

    String archiveId = exchange.getIn().getHeader(HeaderConstants.ARCHIVES_REQUEST_ID, String.class);
    ArchiveRequestEntity archive = repository.findOne(archiveId);

    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 202); // Accepted
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setHeader("Location", archiveLocationUrl + "/" + archiveId);

    msg.setBody(archive);

    exchange.setOut(msg);
}

返回存储请求的当前状态可确保客户端可以检查实际应用了哪些设置,并在某些默认设置不方便或需要应用进一步更改时更新它们。

实际的支持过程是通过将交换发送到 Redis 队列开始的,该队列在不同的机器上使用。此过程的输出将是一个包含所请求文件的存档,该文件被上传到一个公共可访问的位置,并且只有链接将存储在请求实体中。请注意,我们有一个自定义的骆驼组件,它为 Redis 队列模拟了一个 seda 端点。使用seda虽然应该足以在不同的线程中开始处理任务。

根据支持进程的当前状态,存储的请求实体将由支持进程更新。在接收到状态请求(通过GET)时,会查询数据存储的当前状态并映射到某些响应:

public class ReturnArchiveRequestStatus {

  @Resource
  private ArchiveRequestRepository repository;

  @Handler
  public void returnArchiveStatus(Exchange exchange) throws JSONException {

    String archiveId = exchange.getIn().getHeader("archiveId", String.class);

    if (StringUtils.isBlank(archiveId)) {
      badRequest(exchange);
      return;
    }

    ArchiveRequestEntity archive = repository.findOne(archiveId);
    if (null == archive) {
      notFound(archiveId, exchange);
      return;
    }

    ok(archive, exchange);
  }

  private void badRequest(Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);
    JSONObject json = new JSONObject();

    json.put("status", "ERROR");
    json.put("message", "No archive identifier found");

    msg.setBody(json.toString());
    exchange.setOut(msg);
  }

  private void notFound(String archiveId, Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 403);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);
    JSONObject json = new JSONObject();

    json.put("status", "ERROR");
    json.put("message", "Could not find pending archive process with ID " + archiveId);

    msg.setBody(json.toString());
    exchange.setOut(msg);
  }

  private void ok(UserArchiveRequestEntity archive, Exchange exchange) throws JSONException {
    Message msg = new DefaultMessage(exchange.getContext());
    msg.setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
    msg.setHeader(Exchange.CONTENT_TYPE, "application/json; charset=\"utf-8\"");
    msg.setFault(false);

    msg.setBody(archive);
    exchange.setOut(msg);
  }
}

在整个过程中存储和更新的实际实体看起来大致相同(简化):

@Getter
@Setter
@Builder
@ToString
@Document(collection = "archive")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ArchiveRequestEntity {

  /**
   * The current state of the archiving process
   */
  public enum State {
    /** The request to create an archive was cued but not yet processed **/
    QUEUED,
    /** The archive is currently under construction **/
    RUNNING,
    /** The archive was generated successfully. {@link #downloadUrl} should contain the link the
     * archive can be found **/
    FINISHED,
    /** Indicates that the archive generation failed. {@link #error} should indicate the actual
     * reason why the request failed **/
    FAILED
  }

  @Id
  @JsonIgnore
  private String id;

  /** Timestamp the process was triggered **/
  @JsonIgnore
  @Indexed(expireAfterSeconds = DEFAULT_EXPIRE_TIME)
  private Date timestamp = new Date();

  /** The identifier of the company to create the archive for **/
  private String companyUuid;

  /** The state this archive is currently in **/
  private State state = State.QUEUED;

  ...

  /** Marks the upper limit to include entries to the archive. Entries older then this field will
   * not be included in the archives while entries equal or younger than this timestamp will be
   * included unless they are younger than {@link #till} timestamp **/
  @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
  private Date from;
  /** Marks the lower limit to include entries to the archive. Entries younger than this field will
   * not be included in the archive **/
  @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ssXX")
  private Date till;

  /** Information on why the archive creation failed **/
  private String error;

  /** The URL of the final archive to download **/
  private String downloadUrl;

  /** The MD5 Hash of the final artifact in order to guarantee clients an unmodified version of the
   * archive **/
  private String md5Hash;

  ...
}

请注意,无论处理状态的当前状态如何,都将200 OK使用进程状态的当前 JSON 表示形式返回。客户端将看到FINISHED具有downloadUrlmd5Hash属性设置的状态或具有不同可用属性的不同状态。

当然,支持过程需要适当地更新请求状态,否则客户端将无法检索有关请求当前状态的正确信息。

这种方法应该适用于几乎所有长时间运行的进程,尽管您传递的信息的内部可能与我们的场景不同。希望这会有所帮助

于 2018-06-13T14:50:10.503 回答