1

我正在使用 Python 无限期地向 Google PubSub 发布消息。我正在分批发布消息。我已将 batch_size 设置为 20。我在 5 个进程中发布消息。

我在 GCE 中使用 1 个 CPU 和 3.75 内存执行了下面的脚本。运行一个多小时后,GCE 内存不足,发布者崩溃​​。

import json

from multiprocessing import Process
from google.cloud import pubsub

_PROJECT = 'google-project-id'
_TOPIC = 'sample'
_PROCESS_COUNT = 5


def publish(message_str, ps_batch):
    while True:
        ps_batch.publish(message_str)

def main():
    json_message = {'id': 1,'first_name': 'Karim','last_name': 'Bourdice','email': 'kbourdice0@purevolume.com','gender': 'Male','ip_address': '76.108.207.137','field1': '1MCwggd9g267ZBfq8p6KUR8RzXh1UARcBM','field2': '14MbkGizhAuYY9fjzA7UQ1x5sHfPEearMy','field3': '1BC7imHVSwT2xKmZBE6xVwGExXte4M87L7','field4': '1H2CPcuv9JvmnKV3Q9zxtZqZkvn86yyqFz','field5': '1GkanWfbWXxmriDt4vdDq5pb2wySL1nDiZ','field6': '1VKh8m3VcEt5xkfqj9BcCJfuL5nVc3nKZ','field7': '1NuruyCxMeL6YM8ouzD9aqAqFDfHzb6pak','field8': '1DB3Ac7uR5NJziSfJVLM1G2F6tW27SkZqN','field9': '1LdM4nGXPQkSKPTLkZFTbKHN9sAUpBGtSf','field10': '1LLCjdpVYy97u7kBdhHJcgycAZbdZ3R1xF','field11': '16FaG4qFomWMmv4Zv4F8MMUBkcJcEHHijY','field12': '1PXkwtCc4CWSp8BkecAvQubMs12GCvqdVu','field13': '1BqnENPGGDXn9S1FHn2pvwLtDDYu1QTjHL','field14': '1KckpoksBagHxANikRrRBC428srpxAqY5C','field15': '17Z3yPdFgkxEn29JaT89HshvVF4RbtMsMq','field16': '14QpaoakehbRnckiQd7Zte2nSRRGX695Wk','field17': '1wGqgYDSmfE9Q1ZH67GwLqNQmH2u7viRs','field18': '134KjLvr8ugwZK19wzL3V9i8fvmaBBjHQM','field19': '1JrUeWAvufRHXbMLSQeGLkjHJNx3XFM77A','field20': '19Botyc2ikY7V9XqRkJbnoJHWk2eVRxni5','field29': '1DjHXt1yecWxjuNPDMQDaAiCjBvDoEwZr1','field21': '1AZejYQWuuQi9DsGp27tHt7PdCRyBtLD8L','field22': '1AzK3J6vwwh8Xog5TJA48Uh223m5LwqRxz','field23': '12rSBAhFvgFMWbssYmSF3zCpDxmpSPxJV6','field24': '1C1QxVG8QphiRD2uzF6Mg86vRdwTARXpeW','field25': '1GRCqpUZ3oejbic59z16XWUYbw8GYdiq6f','field26': '1CH2yzHjehfuyVW9vw9NPTWX3yW5v2vTFg','field27': '1CCoa5aQPJ4Ya1FDKWrMceRReyptKcWV1N','field28': '1LJSmvzGWwDBLc1YTPaT2k1uAkiX8GjGpz'}
    message_str = json.dumps(json_message)
    batch_size = 20

    ps = pubsub.Client()
    ps_topic = ps.topic(_TOPIC)
    ps_batch = ps_topic.batch(max_messages = batch_size)
    print("Pushing messages to {0}".format(_TOPIC))

    processes = []
    for i in range(1, _PROCESS_COUNT):
        p = Process(target=publish, args=(message_str, ps_batch))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

这是 stackdriver gce-memory-used 图表。 在此处输入图像描述

运行上述 python 脚本后,您可以看到 memory-used 稳步增加。最后它崩溃了。请注意,我在这个 GCE 中单独运行这个 python 脚本,而不是任何其他应用程序

这个问题有什么解决方法吗?我怎样才能避免这里的内存泄漏?

4

3 回答 3

1

查看代码,我很确定问题出在这里: https ://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/pubsub/google/cloud/pubsub/topic.py#L550

您有一个批量发布消息一遍又一遍,但返回的 message_ids 只是无限期地添加到此列表中,直到您 OOM。

这是一个错误。batch.message_ids:-) 您可以通过每隔一段时间设置一个空列表来解决它。

于 2017-06-14T17:58:57.703 回答
1

与其清除batch.message_ids自己,batch.commit()不如不时调用(而不是纯粹依赖“计数器翻转时的隐式提交”语义)。

于 2017-06-14T21:42:56.063 回答
0

正如 Luke 和 Tres 所提到的,这是他们同意修复的 Google Python 客户端的一个问题。

参考https://issuetracker.google.com/u/0/issues/62606697

于 2017-08-22T06:27:23.507 回答