我在 Java Rest api 中使用 Spring Integration 将消息生成到 Kafka 中。 https://github.com/spring-projects/spring-integration-kafka
当我说
kafkaMessageChannel.send(MessageBuilder.withPayload("hi kafka from rest api").build());
这返回true,这意味着它的发布..但无法在主题日志中看到消息..我检查了 /usr/local/var/lib/kafka-logs/test-0 下的日志
即使我运行了消费者控制台,但我无法看到从 api 发布的消息..
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
但是如果我从生产者控制台生成消息,我可以在主题日志和消费者控制台中看到消息。
kafka-console-producer.sh --broker-list localhost:9092 --topic test
我在 spring 集成 rest api 中使用相同的代理列表和主题名称。
我的弹簧配置出站 xml ..
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="producerKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="producerKafka">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="test"
sync="true"
key-class-type="java.lang.String"
key-encoder="encoder"
value-class-type="java.lang.String"
value-encoder="encoder"
partitioner="partitioner"
compression-type="none"
producer-listener="kafkalistner"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<bean id="encoder"
class="org.springframework.integration.kafka.serializer.common.StringEncoder" />
<bean id="kafkalistner" class="kafka.producer.rest.model.ProducerListnerKafka" />
<bean id="partitioner" class="org.springframework.integration.kafka.support.DefaultPartitioner"/>
</beans>
和监听器实现..
public class ProducerListnerKafka extends LoggingProducerListener{
@Override
public void setIncludeContents(boolean includeContents) {
super.setIncludeContents(true);
}
@Override
public void onError(String topic, Integer partition, Object key, Object payload, Exception exception) {
super.onError(topic, partition, key, payload, exception);
}
@Override
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
String kafkatopic=recordMetadata.topic();
}
}
我尝试调试它甚至没有进入 onError 函数。