9

在花了很多时间试图围绕多处理进行思考之后,我想出了这段代码,它是一个基准测试:

示例 1:

from multiprocessing  import Process

class Alter(Process):
    def __init__(self, word):
        Process.__init__(self)
        self.word = word
        self.word2 = ''

    def run(self):
        # Alter string + test processing speed
        for i in range(80000):
            self.word2 = self.word2 + self.word

if __name__=='__main__':
    # Send a string to be altered
    thread1 = Alter('foo')
    thread2 = Alter('bar')
    thread1.start()
    thread2.start()

    # wait for both to finish

    thread1.join()
    thread2.join()

    print(thread1.word2)
    print(thread2.word2)

这在 2 秒内完成(多线程时间的一半)。出于好奇,我决定接下来运行这个:

示例 2:

word2 = 'foo'
word3 = 'bar'

word = 'foo'
for i in range(80000):
    word2 = word2 + word

word  = 'bar'
for i in range(80000):
    word3 = word3 + word

print(word2)
print(word3)

令我惊恐的是,这不到半秒就跑完了!

这里发生了什么?我希望多处理运行得更快 - 鉴于示例 1 是示例 2 分为两个进程,它不应该在示例 2 的一半时间内完成吗?

更新:

在考虑了 Chris 的反馈后,我已经包含了消耗最多处理时间的“实际”代码,并引导我考虑多处理:

self.ListVar = [[13379+ strings],[13379+ strings],
                [13379+ strings],[13379+ strings]]

for b in range(len(self.ListVar)):
    self.list1 = []
    self.temp = []
    for n in range(len(self.ListVar[b])):
        if not self.ListVar[b][n] in self.temp:
            self.list1.insert(n, self.ListVar[b][n] + '(' + 
                              str(self.ListVar[b].count(self.ListVar[b][n])) +
                              ')')
           self.temp.insert(0, self.ListVar[b][n])

   self.ListVar[b] = list(self.list1)
4

4 回答 4

13

此示例太小而无法从多处理中受益。

启动新流程时会有很多开销。如果涉及繁重的处理,则可以忽略不计。但是您的示例实际上并没有那么密集,因此您一定会注意到开销。

您可能会注意到与真实线程的更大差异,太糟糕的 python(好吧,CPython)在 CPU 绑定线程方面存在问题。

于 2012-01-08T04:56:41.537 回答
13

多处理可能对您正在做的事情有用,但对您考虑使用它的方式没有用。由于您基本上是在对列表的每个成员进行一些计算,因此您可以使用该multiprocessing.Pool.map方法对列表成员进行并行计算。

这是一个示例,它显示了使用单个进程并使用的代码性能multiprocessing.Pool.map

from multiprocessing import Pool
from random import choice
from string import printable
from time import time

def build_test_list():
    # Builds a test list consisting of 5 sublists of 10000 strings each.
    # each string is 20 characters long
    testlist = [[], [], [], [], []]
    for sublist in testlist:
        for _ in xrange(10000):
            sublist.append(''.join(choice(printable) for _ in xrange(20)))
    return testlist

def process_list(l):
    # the time-consuming code
    result = []
    tmp = []
    for n in range(len(l)):
        if l[n] not in tmp:
            result.insert(n, l[n]+' ('+str(l.count(l[n]))+')')
            tmp.insert(0, l[n])
    return result

def single(l):
    # process the test list elements using a single process
    results = []
    for sublist in l:
        results.append(process_list(sublist))
    return results

def multi(l):
    # process the test list elements in parallel
    pool = Pool()
    results = pool.map(process_list, l)
    return results

print "Building the test list..."
testlist = build_test_list()

print "Processing the test list using a single process..."
starttime = time()
singleresults = single(testlist)
singletime = time() - starttime

print "Processing the test list using multiple processes..."
starttime = time()
multiresults = multi(testlist)
multitime = time() - starttime

# make sure they both return the same thing
assert singleresults == multiresults

print "Single process: {0:.2f}sec".format(singletime)
print "Multiple processes: {0:.2f}sec".format(multitime)

输出:

Building the test list...
Processing the test list using a single process...
Processing the test list using multiple processes...
Single process: 34.73sec
Multiple processes: 24.97sec
于 2012-01-08T09:04:00.320 回答
12

ETA:既然您已经发布了代码,我可以告诉您有一种简单的方法可以更快地完成您正在做的事情(快 100 倍以上)。

我看到您正在做的是在括号中为字符串列表中的每个项目添加一个频率。您可以创建一个字典,将每个元素映射到其频率,而不是每次都计算所有元素(您可以使用 cProfile 确认,这是迄今为止代码中最大的瓶颈) 。这样,您只需要浏览列表两次——一次创建频率字典,一次使用它来添加频率。

在这里,我将展示我的新方法、计时,并使用生成的测试用例将其与旧方法进行比较。测试用例甚至显示新结果与旧结果完全相同注意:您真正需要注意的是 new_method。

import random
import time
import collections
import cProfile

LIST_LEN = 14000

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t


def random_string(length=3):
    """Return a random string of given length"""
    return "".join([chr(random.randint(65, 90)) for i in range(length)])


class Profiler:
    def __init__(self):
        self.original = [[random_string() for i in range(LIST_LEN)]
                            for j in range(4)]

    def old_method(self):
        self.ListVar = self.original[:]
        for b in range(len(self.ListVar)):
            self.list1 = []
            self.temp = []
            for n in range(len(self.ListVar[b])):
                if not self.ListVar[b][n] in self.temp:
                    self.list1.insert(n, self.ListVar[b][n] + '(' +    str(self.ListVar[b].count(self.ListVar[b][n])) + ')')
                    self.temp.insert(0, self.ListVar[b][n])

            self.ListVar[b] = list(self.list1)
        return self.ListVar

    def new_method(self):
        self.ListVar = self.original[:]
        for i, inner_lst in enumerate(self.ListVar):
            freq_dict = collections.defaultdict(int)
            # create frequency dictionary
            for e in inner_lst:
                freq_dict[e] += 1
            temp = set()
            ret = []
            for e in inner_lst:
                if e not in temp:
                    ret.append(e + '(' + str(freq_dict[e]) + ')')
                    temp.add(e)
            self.ListVar[i] = ret
        return self.ListVar

    def time_and_confirm(self):
        """
        Time the old and new methods, and confirm they return the same value
        """
        time_a = time.time()
        l1 = self.old_method()
        time_b = time.time()
        l2 = self.new_method()
        time_c = time.time()

        # confirm that the two are the same
        assert l1 == l2, "The old and new methods don't return the same value"

        return time_b - time_a, time_c - time_b

p = Profiler()
print p.time_and_confirm()

当我运行它时,它得到 (15.963812112808228, 0.05961179733276367) 的时间,这意味着它快了大约 250 倍,尽管这个优势取决于列表的长度和每个列表中的频率分布。我相信您会同意,凭借这种速度优势,您可能不需要使用多处理 :)

(我的原始答案留在下面以供后代使用)

ETA:顺便说一下,值得注意的是,这个算法在列表的长度上大致是线性的,而你使用的代码是二次的。这意味着元素数量越多,它的性能优势就越大。例如,如果将每个列表的长度增加到 1000000,则只需 5 秒即可运行。根据推断,旧代码将占用一天时间 :)


这取决于您正在执行的操作。例如:

import time
NUM_RANGE = 100000000

from multiprocessing  import Process

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t

def multi():
    class MultiProcess(Process):
        def __init__(self):
            Process.__init__(self)

        def run(self):
            # Alter string + test processing speed
            for i in xrange(NUM_RANGE):
                a = 20 * 20

    thread1 = MultiProcess()
    thread2 = MultiProcess()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

def single():
    for i in xrange(NUM_RANGE):
        a = 20 * 20

    for i in xrange(NUM_RANGE):
        a = 20 * 20

print timefunc(multi) / timefunc(single)

在我的机器上,多处理操作只占用单线程操作的 60% 左右的时间。

于 2012-01-08T05:34:04.463 回答
0

这个线程非常有用!

只需快速观察上面David Robinson提供的第二个代码(2012 年 1 月 8 日 5:34 回答),该代码更适合我当前的需求。

在我的情况下,我以前记录了没有多处理的目标函数的运行时间。当使用他的代码实现一个多处理函数时,他的 timefunc(multi) 并没有反映 multi 的实际时间,而是似乎反映了在父级中花费的时间。

我所做的是将计时功能外部化,并且我得到的时间看起来更像预期:

 start = timefunc()
 multi()/single()
 elapsed = (timefunc()-start)/(--number of workers--)
 print(elapsed)

在我使用双核的情况下,“x”工作人员使用目标函数执行的总时间比使用“x”迭代在目标函数上运行简单的 for 循环快两倍。

我是多处理的新手,所以请谨慎对待这个观察。

于 2014-05-27T09:58:55.070 回答