0

我正在调用一个从 kafka 生产者发送一些数据的函数,但是在它发送之后我返回了一个不返回的响应。代码在返回时卡住了。有人知道发生了什么吗?

我的代码如下,

def postEvent(eventData):
    print("The eventData is...",eventData)
    timestamp = datetime.now().__format__("%Y-%m-%d %H:%M:%S")
    try:
        producer = KafkaProducer(bootstrap_servers=["host:port"])
        data = json.dumps(eventData).encode('utf-8')
        try:

            kafkaResponse = producer.send('streamTest', data)

            response ={'time': str(timestamp), 'kafkaResponse':kafkaResponse.get(), 
                       'postResult': 'true'}
            print('kafaka response is...', response)
        except ConnectionAbortedError:
                response ={'time': str(timestamp), 'postResult': 'false'}
        except kafka.errors.KafkaTimeoutError:
                response ={'time': str(timestamp), 'postResult': 'false'}
        print('kafaka response is...', response)
        return response
    except kafka.errors.NoBrokersAvailable:
        response = {'Response':'Kafka Errors... NoBrokersAvailable'}
        print('kafaka response ', response)
        return response
4

1 回答 1

0

从你的问题中不清楚return它挂在哪个声明上。

我测试了您的代码,它与 Kafka 0.10.0.1 代理和kafka-python1.3.5 完美配合。

这可能是 Kafka 集群网络问题,因此您可能会挂起的两个地方是:1.kafkaResponse.get()在等待Future解决 2. 没有可用的代理,而代理超时。如果您传入多个代理,请记住,它们需要每次超时才能引发NoBrokersAvailable错误。

于 2017-10-13T19:18:39.480 回答