我在 SpringBootTest 类中使用如下 EmbeddedKafkaRule -
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
当'container.start();' 执行语句,日志继续打印以下异常-
2020-05-22 19:12:19.307 INFO 50295 --- [ Test worker] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 10
auto.offset.reset = earliest
bootstrap.servers = [127.0.0.1:0]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = testT
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 60000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2020-05-22 19:12:19.590 INFO 50295 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-05-22 19:12:19.590 INFO 50295 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-05-22 19:12:19.591 INFO 50295 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1590199939584
2020-05-22 19:12:19.605 INFO 50295 --- [ Test worker] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-testT-1, groupId=testT] Subscribed to topic(s): templateTopic
2020-05-22 19:12:19.615 INFO 50295 --- [ Test worker] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-05-22 19:12:20.157 WARN 50295 --- [mplateTests-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testT-1, groupId=testT] Error connecting to node 127.0.0.1:0 (id: -1 rack: null)
java.net.BindException: Can't assign requested address
at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]
at java.base/sun.nio.ch.Net.connect(Net.java:474) ~[na:na]
at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:694) ~[na:na]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:277) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.common.network.Selector.connect(Selector.java:255) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:957) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:293) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:495) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:236) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:463) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1089) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1045) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:970) ~[spring-kafka-2.5.0.RELEASE.jar:2.5.0.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2020-05-22 19:12:20.159 WARN 50295 --- [mplateTests-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testT-1, groupId=testT] Bootstrap broker 127.0.0.1:0 (id: -1 rack: null) disconnected
2020-05-22 19:12:20.249 WARN 50295 --- [mplateTests-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testT-1, groupId=testT] Error connecting to node 127.0.0.1:0 (id: -1 rack: null)
java.net.BindException: Can't assign requested address
at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]
我试图将默认端口从 0 更改为非零值,但它对以下消息没有帮助 -
2020-05-22 16:23:59.388 INFO 21162 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-05-22 16:23:59.388 INFO 21162 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-05-22 16:23:59.388 INFO 21162 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1590189839385
2020-05-22 16:23:59.407 WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9090) could not be established. Broker may not be available.
2020-05-22 16:23:59.407 WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9090 (id: -1 rack: null) disconnected
2020-05-22 16:23:59.511 WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9090) could not be established. Broker may not be available.
2020-05-22 16:23:59.512 WARN 21162 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9090 (id: -1 rack: null) disconnected
我正在使用 org.springframework.kafka:spring-kafka-test:2.5.0.RELEASE 如果您遇到此问题并已解决,请告诉我
谢谢