I am integrating my application with spring-kafka (not spring-integration-kafka). Here is spring documentation for project : http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsingle
My producer works perfectly but consumer is not consuming any messages. Any pointers.
Here is my configuration:
@EnableKafka
public class MyConfig {
@Value("${kafka.broker.list}") // List of servers server:port,
private String kafkaBrokerList;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Message>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(12);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.setAutoStartup(Boolean.TRUE);
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Bean
public ConsumerFactory<Integer, Message> consumerFactory() {
JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class);
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new IntegerDeserializer(), messageJsonDeserializer);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 60000);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 10000);
return props;
}
@KafkaListener(topics = "myTopic", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload Message message) {
System.out.println(message);
}
}
** Edited with more information **
Thanks Gary for response. I don't see any exceptions in log. Also I tried KafkaTemplate
with similar configuration and i am able to publish message to queue but for consumer, no luck. I am changing code to use String instead of my Message object. Here are parts of log:
2016-07-11 09:31:43.314 INFO [RMI TCP Connection(2)-127.0.0.1] o.a.k.c.c.ConsumerConfig [AbstractConfig.java:165] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id =
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [app1.qa:9092, app1.qa:9093, app2.qa:9092, app2.qa:9093, app3.qa:9092, app3.qa:9093]
retry.backoff.ms = 10000
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 60000
ssl.truststore.password = null
session.timeout.ms = 15000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 10000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = latest
Also I do see following in log:
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-11] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-8] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.