我尝试为下一个 Spring Cloud Stream 版本准备我们的应用程序。(目前使用 3.0.0.RC1)。使用 Kafka 活页夹。
现在我们收到一条消息,对其进行处理并将其重新发送到另一个主题。分别处理每条消息会导致对我们数据库的大量单个请求。
在 3.0.0 版本中,我们希望将消息作为批处理处理,因此我们可以将数据保存在批处理更新中。
在当前版本中我们使用@EnableBinding、@StreamListener
@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}
void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
for ( final ExchangeableItem exchangeableItem : item ) {
final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
.setHeader( "partitionKey", exchangeableItem.getId() )
.build();
messageChannel.send( message )
}
}
我已将使用者属性设置为“批处理模式”并将签名更改为List<>
,但这样做会导致收到 aList<byte[]>
而不是预期的List<ExchangeableStock>
。Ofc 之后可以进行转换,但这感觉就像“meh”,我认为这应该在调用 Listener 之前发生。
然后我尝试了(新的)功能版本,并且使用效果很好。我也喜欢这个简单的处理版本
@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
return articleService::updateStockInformation;
}
但是输出主题现在接收到一个对象列表作为一条消息,并且后续消费者无法正常工作。
我想我错过了什么...
我是否需要添加某种 MessageConverter (对于注释驱动版本)或者是否有办法通过功能版本实现所需的行为?