我们在 python3.6 上遇到了 pykafka 2.7.0 的问题。有一些集成测试,出于某种原因,在我们生成的主题上形成了死锁。在所有测试完成后,nosetests 拒绝完成。当我们在 python2.7 上运行相同的测试时不会发生这种情况,并且我们在 python3.6 上使用了 pykafka 2.8.0 没有任何帮助。
修复它的唯一方法是在向主题生成消息后立即停止/删除生产者的临时修复(如您在最后 2 行代码中所见),这会花费大量时间来停止生产者。
def publish(self, topic, message):
topic = topic.lower()
self._log.info('publish top topic ' + topic)
if topic not in self.producer_map:
k_topic = self.__messenger.topics[topic.encode()]
self._log.info(k_topic)
new_producer = k_topic.get_producer()
self.producer_map[topic] = new_producer
self.producer_map[topic].produce(message)
# The fix
self.producer_map[topic].stop()
del self.producer_map[topic]
如果最后两行不存在,我在 gdb 中看到该进程在垃圾收集后立即卡住,并试图停止生产者。然后它挂在_wait_for_tstate_lock。它阻止了我们在 Jenkins 中的 CI 完成,并且想知道为什么它在垃圾收集上死锁,但在代码中调用时却没有死锁。
#9 Frame 0x7f4824fe7958, for file /usr/lib64/python3.6/threading.py, line 1072, in _wait_for_tstate_lock (self=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7670>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <_io.TextIOWrapper at remote 0x7f48549b1708>)) at remote 0x7f48275474a8>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4824de3160>, block=True, timeout=-1, lock=<_thread.lock at remote 0x7f4824de78a0>)
elif lock.acquire(block, timeout):
#13 Frame 0x7f4824e045a0, for file /usr/lib64/python3.6/threading.py, line 1056, in join (self=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de76---Type <return> to continue, or q <return> to quit---
70>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <redacted>, <redacted>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4824de3160>, timeout=None)
self._wait_for_tstate_lock()
#17 (frame information optimized out)
#21 Frame 0x7f484d8bee08, for file /usr/local/lib/python3.6/site-packages/pykafka/producer.py, line 235, in __del__ (self=<Producer(_cluster=<Cluster(_seed_hosts='<redacted>:9092', _socket_timeout_ms=30000, _offsets_channel_socket_timeout_ms=10000, _handler=<ThreadingHandler at remote 0x7f4824e81da0>, _brokers={0: <Broker(_connection=<BrokerConnection(_buff=<bytearray at remote 0x7f4824fdb228>, host=b'<redacted>', port=9092, _handler=<...>, _socket=<socket at remote 0x7f4824dff048>, source_host='', source_port=0, _wrap_socket=<function at remote 0x7f4824dfba60>) at remote 0x7f4824fd5898>, _offsets_channel_connection=None, _id=0, _host=b'<redacted>', _port=9092, _source_host='', _source_port=0, _ssl_config=None, _handler=<...>, _req_handler=<RequestHandler(handler=<...>, shared=<Shared at remote 0x7f4824dd0048>, t=<Thread(_target=<function at remote 0x7f4824dfbae8>, _name="1: pykafka.RequestHandler.worker for b'<redacted>':9092", _args=(), _kwargs={}, _daemonic=True, _ident=139948381951744, _tstate_lock=<_thread.lock at remote 0x7f4824de7120>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de71e8>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de71e8>, release=<built-in method release of _thread.lock object at remote 0x7f4824de71e8>, _waiters=<collections.deque at remote 0x7f4824e079a0>) at remote 0x7f4835420198>, _flag=True) at remote 0x7f4835424358>, _is_stopped=False, _initialized=True, _stderr=<redacted>, <redacted>, <redacted>, <_io.TextIOWrapper at remote 0x7f48549b1708>)) at remote 0x7f48275474a8>)) at remote 0x7f4827547518>)) at remote 0x7f48275476d8>) at remote 0x7f4835424940>) at remote 0x7f4824f62438>, _offsets_channel_req_handler=None, _socket_timeout_ms=30000, _offsets_channel_socket_timeout_ms=10000, _buffer_size=1048576, _req_handlers={}, _broker_version='0.9.0', _api_versions={0: <ApiVersionsSpec at remote 0x7f4824e88408>, 1: <ApiVersionsSpec at remote 0x7f4824e88458>, 2: <ApiVersionsSpec at remote 0x7f4824e884a8>, 3: <ApiVersionsSpec at remote 0x7f4824e884f8>, 4: <ApiVersionsSpec at remote 0x7f4824e88548>, 5: <ApiVersionsSpec at remote 0x7f4824e88598>, 6: <ApiVersionsSpec at remote 0x7f4824e885e8>, 7: <ApiVersionsSpec at remote 0x7f4824e88638>, 8: <ApiVersionsSpec at remote 0x7f4824e88688>, 9: <ApiVersionsSpec at remote 0x7f4824e886d8>, 10: <ApiVersionsSpec at remote 0x7f4824e88728>, 11: <ApiVersionsSpec at remote 0x7f4824e88778>, 12: <ApiVersionsSpec at remote 0x7f4824e887c8>, 13: <ApiVersionsSpec at remote 0x7f4824e88818>, 14: <ApiVersionsSpec at remote 0x7f4824e88868>, 15: <ApiVersionsSpec at remote 0x7f4824e888b8>, 16: <ApiVersionsSpec at remote 0x7f4824e88908>}) at remote 0x7f482500f4e0>}, _topics=<TopicDict(_cluster=<weakref at remote 0x7f4824f9c4f8>, _exclude_internal_topics=True) at remote 0x7f4824fcd888>, _source_address='', _source_host='', _source_port=0, _ssl_config=None, _zookeeper_connect=None, _max_connection_retries=3, _max_connection_retries_offset_mgr=8, _broker_version='0.9.0', _api_versions={...}, controller_broker=None) at remote 0x7f4824ed8198>, _protocol_version=0, _topic=<Topic(_name=b'<redacted>', _cluster=<...>, _partitions={0: <Partition(_id=0, _leader=<...>, _replicas=[<...>], _isr=[<...>], _topic=<weakref at remote 0x7f4824e0cdb8>) at remote 0x7f48353ffbe0>}) at remote 0x7f4824deedd8>, _partitioner=<RandomPartitioner(idx=0) at remote 0x7f48526baba8>, _compression=0, _max_retries=3, _retry_backoff_ms=100, _required_acks=1, _ack_timeout_ms=10000, _max_queued_messages=100000, _min_queued_messages=70000, _linger_ms=5000, _queue_empty_timeout_ms=0, _block_on_queue_full=True, _max_request_size=1000012, _synchronous=False, _worker_exception=None, _owned_brokers={0: <OwnedBroker(producer=<weakproxy at remote 0x7f4824e0ce08>, broker=<...>, lock=<_thread.RLock at remote 0x7f4824fd8420>, flush_ready=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7698>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7698>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7698>, _waiters=<collections.deque at remote 0x7f4824ddc118>) at remote 0x7f4824de3320>, _flag=False) at remote 0x7f4824de33c8>, has_message=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7710>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7710>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7710>, _waiters=<collections.deque at remote 0x7f4824ddc0b0>) at remote 0x7f4824de3240>, _flag=True) at remote 0x7f4824de3390>, slot_available=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7788>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7788>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7788>, _waiters=<collections.deque at remote 0x7f4824ddc180>) at remote 0x7f4824de3208>, _flag=True) at remote 0x7f4824de3278>, queue=<collections.deque at remote 0x7f4824ddc1e8>, messages_pending=2, running=False, _auto_start=True, _queue_reader_worker=<Thread(_target=<function at remote 0x7f4824ef4ae8>, _name='3: pykafka.OwnedBroker.queue_reader for broker 0', _args=(...), _kwargs={}, _daemonic=True, _ident=139948465846016, _tstate_lock=<_thread.lock at remote 0x7f4824de78a0>, _started=<Event(_cond=<Condition(_lock=<_thread.lock at remote 0x7f4824de7670>, acquire=<built-in method acquire of _thread.lock object at remote 0x7f4824de7670>, release=<built-in method release of _thread.lock object at remote 0x7f4824de7670>, _waiters=<collections.deque at remote 0x7f4824ddc2b8>) at remote 0x7f4824de3128>, _flag=True) at remote 0x7f4824de31d0>, _is_stopped=False, _initialized=True, _stderr=<...>) at remote 0x7f4824de3160>) at remote 0x7f4824e81cc0>}, _delivery_reports=<_DeliveryReportNone(queue=None) at remote 0x7f4824e814a8>, _pending_timeout_ms=5000, _auto_start=True, _serializer=None, _running=True, _update_lock=<_thread.lock at remote 0x7f4824de7620>) at remote 0x7f4824fdb208>)
self.stop()
#30 Garbage-collecting