我正在尝试部署 2 个 Spring Boot 应用程序(kafka Producer 和 Consumer)。当我将 Producer 部署到 docker 时一切正常,但是当我部署时,我的 Consumer 不起作用,因为没有与 kafka 容器的连接。
日志告诉我这个错误
2019-11-17 05:32:22.644 WARN 1 --- [main] o.a.k.c.NetworkClient: [Consumer clientId=consumer-1, groupId=exampleGroup] Connection to node -1 could not be established. Broker may not be available.
我的 docker-compose.yml 是
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic1:1:1"
在我的 KafkaConfig 课程中:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "exampleGroup");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaConstants.ENABLE_AUTO_COMMIT_CONFIG);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
// config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, KafkaConstants.SESSION_TIMEOUT_MS);
return new DefaultKafkaConsumerFactory<>(config);
}
和常量类
public class KafkaConstants {
public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT=1000;
public static String TOPIC_NAME="demo";
public static String GROUP_ID_CONFIG="exampleGroup";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT=100;
public static String OFFSET_RESET_LATEST="latest";
public static String OFFSET_RESET_EARLIER="earliest";
public static Integer MAX_POLL_RECORDS=1;
public static Integer SESSION_TIMEOUT_MS = 180000;
public static Integer REQUEST_TIMEOUT_MS_CONFIG = 181000;
public static String ENABLE_AUTO_COMMIT_CONFIG = "false";
public static Integer AUTO_COMMIT_INTERVAL_MS_CONFIG = 8000;
}
当我在我的计算机上安装 zookepper 和 kafka 并使用 intellij 运行这 2 个 spring boot 应用程序时工作正常。问题是当我部署到本地 docker 时。
你能帮我么?
更新
更新我的 docker-compose:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "ACC_GROUP_CREATE:1:1"
consumer:
image: micro1
container_name: micro1
depends_on:
- kafka
restart: always
ports:
- 8088:8088
depends_on:
- kafka
links:
- kafka:kafka
producer:
image: micro2
container_name: micro2
depends_on:
- kafka
restart: always
ports:
- 8087:8087
depends_on:
- kafka
links:
- kafka:kafka
工作正常!基于@hqt 的响应,但我不知道为什么需要添加这些消费者/生产者行