0

解决了!向下滚动到解决方案。

我有实体 Person,在表 A 上有一些基本数据,在表 B、C、D 等(例如地址)上有更具体的数据。

PersonResponseDTO(总结):

{
    "id": 1,
    "name": "Test"
}

AddressResponseDTO(总结):

{
    "person_id": 1,
    "street": "Test St."
}

这些数据来自一个名为 using from("direct:getPersonById").to(getPersonUrl)and from("direct:getAddressByPersonId").to(getAddressUrl)(summarized) 的外部 API。

我创建了第三个名为 AggregatedPersonResponseDTO 的对象:

{
    "person": {
        "id": 1,
        "name": "Test"
    },
    "address": {
        "person_id": 1,
        "street": "Test St."
    }
}

有没有一种简单的方法可以在一个请求中加入两个响应,返回一个 AggregatedPersonResponseDTO 类型的对象,只使用 Camel API?我想使用两个响应对象来构建第三个。而且我将来会有两个以上“连接”的用例。

解决方案说明

  1. 不需要将 streamCaching 设置为 true 或 false。

  2. 不需要设置 HTTP_PATH。

  3. 骆驼路线中的代码:

from("direct:getFullPersonByIdService")
    .toD("http:{{endpoints.get-person-by-id}}?bridgeEndpoint=true")
    .pollEnrich(
        simple("http:{{endpoints.get-address-by-person-id}}?bridgeEndpoint=true"),
        5000,
        new PersonAggregationStrategy(),
        false
    )
    .unmarshal(new JacksonDataFormat(GetAggregatedPersonResponseDTO.class))

双花括号之间的内容从 application.yml 或 application.properties 中读取。

  1. 整个 PersonAggregationStrategy 类:
@Log4j2
public class PersonAggregationStrategy implements AggregationStrategy {

    @SneakyThrows
    @Override
    public Exchange aggregate(final Exchange exchangePerson,
                              final Exchange exchangeAddress) {
        log.info("Aggregating Person and Address...");

        ObjectMapper objectMapper = new ObjectMapper();

        final GetAggregatedPersonResponseDTO aggregatedPerson = new GetAggregatedPersonResponseDTO();
        aggregatedPerson.setPerson(objectMapper.readValue(exchangePerson.getIn().getBody(String.class), GetPersonResponseDTO.class));
        aggregatedPerson.setAddress(objectMapper.readValue(exchangeAddress.getIn().getBody(String.class), GetAddressResponseDTO.class));

        exchangePerson.getIn().setBody(objectMapper.writeValueAsString(aggregatedPerson));
        log.info("Aggregated object => {}", objectMapper.writeValueAsString(aggregatedPerson));

        return exchangePerson;
    }

}
  1. 我还必须为聚合的结果对象实现 TypeConverters 接口:
@Component
public class AggregatedPersonConverter implements TypeConverters {

    private final ObjectMapper mapper;

    @Autowired
    public AggregatedPersonConverter(ObjectMapper mapper) {
        this.mapper = mapper;
    }

    @Converter
    public InputStream getAggregatedPersonResponseDTOToInputStream(GetAggregatedPersonResponseDTO source) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();

        try {
            ObjectOutputStream oos = new ObjectOutputStream(baos);

            oos.writeObject(source);

            oos.flush();
            oos.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return new ByteArrayInputStream(baos.toByteArray());
    }

}
  1. 我不知道它是否适用于两个以上的回调。也许它需要 AggregationStrategy 的其他实现。有一天我会测试这个用例。
4

1 回答 1

1

您必须对路由进行建模,以使用第二个来丰富第一个 Web 服务结果。必须在您的AggregationStrategy实例中指定合并两个响应的方式。

查看丰富的 EIP: https ://camel.apache.org/components/3.14.x/eips/enrich-eip.html

于 2022-02-18T11:44:21.917 回答