0

I have a Spring Boot app (app0) that uses Spring Cloud Stream Kafka to read from a topic.

There are two other apps (app1, app2) that produce messages into that topic.

app1 publishes messages using an interface OrderSource:

public interface OrderSource{ 

    String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;

    @Output(OrderSource.OUTPUT_PAYMENT)
    MessageChannel output();

For instance:

orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);

In this case, app0 reads the messages from app1 without any problem.

app2 publishes its messages using KafkaTemplate:

ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
    SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);

In this case I am observing the following exception from EmbeddedHeadersMessageConverter:

java.lang.StringIndexOutOfBoundsException: String index out of range: 152
    at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
    at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]

Apparently it is trying to extract headers from the payload of the message. How can I prevent this exception from happening while supporting both sources of messages (KafkaTemplate and OrderSource).

4

1 回答 1

1

要与非 Spring-Cloud-Stream 应用程序通信,您需要将headerMode使用者上的raw.

您还需要对app1的生产者执行相同操作,这样他就不会嵌入标头。

请参阅消费者属性生产者属性

于 2017-02-11T17:34:06.633 回答