解决了!向下滚动到解决方案。
我有实体 Person,在表 A 上有一些基本数据,在表 B、C、D 等(例如地址)上有更具体的数据。
PersonResponseDTO(总结):
{
"id": 1,
"name": "Test"
}
AddressResponseDTO(总结):
{
"person_id": 1,
"street": "Test St."
}
这些数据来自一个名为 using from("direct:getPersonById").to(getPersonUrl)
and from("direct:getAddressByPersonId").to(getAddressUrl)
(summarized) 的外部 API。
我创建了第三个名为 AggregatedPersonResponseDTO 的对象:
{
"person": {
"id": 1,
"name": "Test"
},
"address": {
"person_id": 1,
"street": "Test St."
}
}
有没有一种简单的方法可以在一个请求中加入两个响应,返回一个 AggregatedPersonResponseDTO 类型的对象,只使用 Camel API?我想使用两个响应对象来构建第三个。而且我将来会有两个以上“连接”的用例。
解决方案说明
不需要将 streamCaching 设置为 true 或 false。
不需要设置 HTTP_PATH。
骆驼路线中的代码:
from("direct:getFullPersonByIdService")
.toD("http:{{endpoints.get-person-by-id}}?bridgeEndpoint=true")
.pollEnrich(
simple("http:{{endpoints.get-address-by-person-id}}?bridgeEndpoint=true"),
5000,
new PersonAggregationStrategy(),
false
)
.unmarshal(new JacksonDataFormat(GetAggregatedPersonResponseDTO.class))
双花括号之间的内容从 application.yml 或 application.properties 中读取。
- 整个 PersonAggregationStrategy 类:
@Log4j2
public class PersonAggregationStrategy implements AggregationStrategy {
@SneakyThrows
@Override
public Exchange aggregate(final Exchange exchangePerson,
final Exchange exchangeAddress) {
log.info("Aggregating Person and Address...");
ObjectMapper objectMapper = new ObjectMapper();
final GetAggregatedPersonResponseDTO aggregatedPerson = new GetAggregatedPersonResponseDTO();
aggregatedPerson.setPerson(objectMapper.readValue(exchangePerson.getIn().getBody(String.class), GetPersonResponseDTO.class));
aggregatedPerson.setAddress(objectMapper.readValue(exchangeAddress.getIn().getBody(String.class), GetAddressResponseDTO.class));
exchangePerson.getIn().setBody(objectMapper.writeValueAsString(aggregatedPerson));
log.info("Aggregated object => {}", objectMapper.writeValueAsString(aggregatedPerson));
return exchangePerson;
}
}
- 我还必须为聚合的结果对象实现 TypeConverters 接口:
@Component
public class AggregatedPersonConverter implements TypeConverters {
private final ObjectMapper mapper;
@Autowired
public AggregatedPersonConverter(ObjectMapper mapper) {
this.mapper = mapper;
}
@Converter
public InputStream getAggregatedPersonResponseDTOToInputStream(GetAggregatedPersonResponseDTO source) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(source);
oos.flush();
oos.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return new ByteArrayInputStream(baos.toByteArray());
}
}
- 我不知道它是否适用于两个以上的回调。也许它需要 AggregationStrategy 的其他实现。有一天我会测试这个用例。