1

我正在做一个项目,我有非常特定类型的日志,为了让这些日志正常工作,我创建了一个类,它基本上维护一个字典,记录执行的每个步骤所采取的时间和操作。后来用于多种分析目的。稍后我从该字典创建一个 JSON 文件并将其上传到 s3 存储桶。

当我只有一个主要流程时,一切正常。但是为了提高效率,我已经为特定任务包含了多处理。我正在生成 4 个调用相同函数的进程,并且我希望维护日志的类的对象由所有这些进程共享。

我已经完成了其他 StackOverflow 问题,但它们似乎都不起作用。如果您认为这在某个地方有更好的答案,请指导我,我可能已经跳过它。

class JsonLogs:
    def __init__(self, date, site_name, user_uuid, is_login=False, request_data=None):
        self.date = date
        self.attribute = value
        if is_login:
            ...
        else:
            ...


    def add_a_logs(self, process, message):
        self.log_dict['key']['some_key'].append(
            {
             'start_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
             'start_message': message,
             'end_message': None,
             'end_time': None,
             'duration': None

            }
        )

    def update_a_logs(self, process=None, message=None, response_data=None):
        log_obj = next((log for log in self.log_dict["key"]['some_key'] if log['process'] == process),
                       None)

        if response_data:
            self.log_dict['key']['some_key'] = response_data

        if log_obj:
            log_obj['end_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
            log_obj['end_message'] = message
            td = datetime.datetime.now() - datetime.datetime.strptime(log_obj['start_time'], '%Y-%m-%d %H:%M:%S.%f')

            log_obj['duration'] = "{}".format(td.seconds)

    .....
    .....

还有其他类似的方法,然后将此字典转换为 JSON 并上传到 s3 存储桶中。这与 python 日志记录模块无关,因为我有一个非常具体的要求。

我需要这个类的对象由我正在创建的 4 个进程共享。

编辑

为了让人们更容易理解,我真正想要实现的目标可以通过下面给出的两个示例来理解。

单一进程:这行得通

from multiprocessing import Process


class A:
    def __init__(self, a):
        self.a = a

def a():
    x = A(10)
    b(x)
    print(x.a)

def b(y):
    y.a = 20

a()

输出:20

多处理:我想要实现的目标

from multiprocessing import Process


class A:
    def __init__(self, a):
        self.a = a


def a():
    x = A(10)
    p1 = Process(target=b, args=(x,))
    p2 = Process(target=b, args=(x,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    # b(x)
    print(x.a)

def b(y):
    y.a = 20


a()

电流输出:10

我需要什么:20

4

1 回答 1

0

您需要使用共享内存在进程之间共享对象,因为每个进程都有自己的内存空间。您可以使用管理器对象共享对象。但最好避免共享复杂的对象,如指南所述。

或者,您可以使用队列对象。这是一个玩具示例:

import datetime
from multiprocessing import Process, Queue

def worker(q):
    for _ in range(5):
        q.put(
        {
            'start_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
            'start_message': 'message'
        })
    q.put(None) # End of the queue

def a():
    q = Queue()
    p1 = Process(target=worker, args=(q,))
    p2 = Process(target=worker, args=(q,))
    p1.start()
    p2.start()

    worker_end = 0
    while True:
        i = q.get()
        if i is None:
            worker_end += 1
        else:
            print(i)

        if worker_end == 2:
            break

    p1.join()
    p2.join()

a()
于 2020-03-06T10:13:53.263 回答