2

我有以下情况:

  • 多线程应用程序
  • 无法控制线程创建。这是由框架管理的(在本例中为 celery)
  • 我有一些对象实例化起来很昂贵,而且不是线程安全的。使它们线程安全不是一种选择。
  • 对象可以在多个地方实例化,但是如果我在一个已经定义的线程中重新实例化同一个对象,则应该重用该对象。

我想出了以下模式:

#!/usr/bin/env python

import threading
import time

class MyObj1:
    def __init__(self, name):
        self.name = name

local = threading.local()
def get_local_obj(key, create_obj, *pars, **kwargs):
    d = local.__dict__
    if key in d: obj = d[key]
    else       :
        obj = create_obj(*pars, **kwargs)
        d[key] = obj
    return obj

class Worker(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        myobj1 = get_local_obj('obj1', MyObj1, (self.name))
        for _ in xrange(3):
            print myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(2)]
    for t in ths : t.start()

test()

在这里,我自己创建线程,因为这只是一个测试,但如前所述,在实际应用程序中,我无法控制线程。

我感兴趣的是功能get_local_obj。我有几个问题:

  1. 这个逻辑会保证对象不在线程之间共享吗?
  2. 这个逻辑会保证对象在一个线程中不会被多次实例化吗?
  3. 这个内存会泄漏吗?
  4. 您对这种方法有什么一般性意见吗?对于上面建议的场景有什么更好的建议吗?

编辑

澄清一下:我的应用程序是多线程的,但不是我在创建线程。我只是在创建一些对象,它们恰好在框架创建的线程中运行。我的一些对象不是线程安全的,所以我只需要为每个线程创建一次。因此get_my_object

编辑

local = threading.local() 必须在全局范围内定义。

4

3 回答 3

1

这个如何?

class Worker (Thread):
  def __init__(self):
    super(Worker,self).__init__()
    self.m_local = threading.local()

  def get_my_obj(self):
    try:
      obj = self.m_local.my_object
    except AttributeError:
      self.m_local.my_object = create_object()
      obj = self.m_local.my_object
    return obj

  def run(self):
    my_obj = self.get_my_obj()
    # ...

最后,它与您的示例相似,只是更清洁。您将所有线程特定的代码保存在一个地方,run函数“不知道”关于初始化的任何内容,它my_obj使用 getter,而 getter 只创建一次对象。threading.local将保证数据是特定于线程的——这就是它的工作。

我看不出那里有任何内存泄漏的原因。最后,你需要出一点汗才能得到 python 中的泄漏:)

于 2012-12-21T14:15:02.193 回答
1

FWIW,这是您的代码的修改版本,根据一个答案另一个相关问题进行了一些简化。不过,它仍然是基本相同的模式。

#!/usr/bin/env python
import threading
import time
threadlocal = threading.local()

class MyObj1(object):
    def __init__(self, name):
        print 'in MyObj1.__init__(), name =', name
        self.name = name

def get_local_obj(varname, factory, *args, **kwargs):
    try:
        return getattr(threadlocal, varname)
    except AttributeError:
        obj = factory(*args, **kwargs)
        setattr(threadlocal, varname, obj)
        return obj

class Worker(threading.Thread):
    def __init__(self):
        super(Worker, self).__init__()

    def run(self):
        myobj1 = get_local_obj('obj1', MyObj1, self.name)
        for _ in xrange(3):
            print myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(3)]
    for t in ths:
        t.start()

test()

实际上,没有 a 也可以做完全相同的事情get_local_obj()

#!/usr/bin/env python
import threading
import time
threadlocal = threading.local()

class MyObj1(object):
    def __init__(self, name):
        print 'in MyObj1.__init__(), name =', name
        self.name = name

class Worker(threading.Thread):
    def __init__(self):
        super(Worker, self).__init__()

    def run(self):
        threadlocal.myobj1 = MyObj1(self.name)
        for _ in xrange(3):
            print threadlocal.myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(3)]
    for t in ths:
        t.start()

test()
于 2012-12-21T23:24:49.140 回答
0

Here's a another different answer that utilizes an idea I had of having thread-level singletons. It gets completely rid of your get_local_obj() function. I haven't done a lot of testing, but so far it seems to work. It may be more than you want because it literally implements what you said you wanted in your last bullet-point:

  • The objects can be instantiated in multiple places, but if I am reinstantiating the same object in one thread where it has already been defined, the object should be reused.

#!/usr/bin/env python
import threading
import time
threadlocal = threading.local()

class ThreadSingleton(type):
    # called when instances of client classes are created
    def __call__(cls, *args, **kwargs):
        instances = threadlocal.__dict__.setdefault(cls.__name__+'.instances', {})
        if cls not in instances:
            instances[cls] = super(ThreadSingleton, cls).__call__(*args, **kwargs)
        return instances[cls]

class MyClass(object):
    __metaclass__ = ThreadSingleton
    def __init__(self, name):
        print 'in MyClass.__init__(), name =', name
        self.name = name

class Worker(threading.Thread):
    def __init__(self):
        super(Worker, self).__init__()

    def run(self):
        myobj1 = MyClass(self.name)
        for _ in xrange(3):
            print 'myobj1.name:', myobj1.name
            myobj2 = MyClass(self.name+'#2') # this returns myobj1
            print 'myobj2.name:', myobj2.name # so this prints myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(3)]
    for t in ths:
        t.start()

test()
Note that the output will be somewhat jumbled as it is generated by the different threads. This can be fixed, but I decided not to complicate the essence of this answer by adding it.
于 2012-12-22T10:46:38.323 回答