2

Due to one reason or another, we recently re-wrote our consumers and producers using different libraries than initially written in. However, I've been having some issues after the switch.

Using Kafka 0.9.0.2:
8 partitions

  • We consume from one topic, process the message, and push to another topic.

Due to extensive processing causing session timeouts before being able to commit offsets, the following config options were updated:
Consumer: session.timeout.ms: 1m40s
Broker: group.max.session.timeout.ms: 2m and group.min.session.timeout.ms: 6s

The issue I'm having is that once every several startups of my Consumer, it seems to hang while trying to fetch messages.
No errors or exceptions, it doesn't eventually timeout, it just sits. Considering my process which implements the consumer restarts every so often, this is a breaking problem, and I'm out of ideas. I don't know if it's a config update that needs to be made, or if I'm not handling shutdown properly, which is causing some sort of timeout to be exceeded on the broker.

    consumer = KafkaConsumer(
        self.topic,
        bootstrap_servers=self.host,
        group_id=self.group,
        enable_auto_commit=False,
        max_partition_fetch_bytes=100*1048576,
        auto_offset_reset='earliest',
        session_timeout_ms=100000)

    print 'Consumer created'
    for message in consumer:
        print message.offset
        print message.value
  1. I add 100,000 messages to Kafka.
  2. I turn on this service and allow consumption of ~1000 messages.
  3. I restart the process to simulate what's happening.
  4. The process hangs, output: Consumer created

Has anyone seen their consumers hang on startup?
Is there anything I should look for in Kafka logs?

I'm also noticing consumption from the partitions to be increasingly slow.

4

0 回答 0