0

我尝试为下一个 Spring Cloud Stream 版本准备我们的应用程序。(目前使用 3.0.0.RC1)。使用 Kafka 活页夹。

现在我们收到一条消息,对其进行处理并将其重新发送到另一个主题。分别处理每条消息会导致对我们数据库的大量单个请求。

在 3.0.0 版本中,我们希望将消息作为批处理处理,因此我们可以将数据保存在批处理更新中。

在当前版本中我们使用@EnableBinding、@StreamListener

@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
    publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}

void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
    for ( final ExchangeableItem exchangeableItem : item ) {
        final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
                            .setHeader( "partitionKey", exchangeableItem.getId() )
                            .build();
        messageChannel.send( message )
    }
}

我已将使用者属性设置为“批处理模式”并将签名更改为List<>,但这样做会导致收到 aList<byte[]>而不是预期的List<ExchangeableStock>。Ofc 之后可以进行转换,但这感觉就像“meh”,我认为这应该在调用 Listener 之前发生。

然后我尝试了(新的)功能版本,并且使用效果很好。我也喜欢这个简单的处理版本

@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
    return articleService::updateStockInformation;
}

但是输出主题现在接收到一个对象列表作为一条消息,并且后续消费者无法正常工作。

我想我错过了什么...

我是否需要添加某种 MessageConverter (对于注释驱动版本)或者是否有办法通过功能版本实现所需的行为?

4

2 回答 2

0

IIRC,批处理模式仅支持功能。

您不能Consumer<List< ExchangeableStock>>像当前在 StreamListener 中那样使用消息并将消息发送到频道吗?

于 2019-11-08T13:47:29.577 回答
0

我已经做到了:

@Bean
@Measure
public Consumer<List<ExchangeableStock>> stocks() {
    return items -> {
        for ( final ExchangeableStock exchangeableItem : articleService. updateStockInformation( items ) ) {
            final Message<?> message = MessageBuilder.withPayload( exchangeableItem )
                            .setHeader( "partitionKey", exchangeableItem.getId() )
                            .setHeader( KafkaHeaders.TOPIC, "stocks-stg" )
                            .build();

            processor.onNext( message );
        }
    };
}

private final TopicProcessor<Message<?>> processor = TopicProcessor.create();

@Bean
@Measure
public Supplier<Flux<?>> source() {
    return () -> processor;
}

但是动态目的地解析对我不起作用。我尝试使用KafkaHeaders.TOPICandspring.cloud.stream.sendto.destination作为标头,并设置 Kafka 绑定生产者属性use-topic-header: true(用于绑定source-out-0

如果我为source-out-0它的工作设置一个目标,但这样做会导致很多TopicProceessors 和Suppliers - 我们有大约 10 种不同的消息类型。

也许我错过了一些小东西来让动态目的地解析工作......

于 2019-11-12T08:28:05.400 回答