-1

这个问题与我在这里问过的其他问题有关,主要是关于在内存中对大量数据进行排序。

基本上这就是我想要/拥有的:

Twisted XMLRPC 服务器正在运行。该服务器在内存中保留了几个 (32) Foo 类的实例。每个 Foo 类都包含一个列表栏(将包含数百万条记录)。有一个服务从数据库中检索数据,并将其传递给 XMLRPC 服务器。数据基本上是一个字典,键对应于每个 Foo 实例,值是字典列表,如下所示:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

然后每个 Foo 实例被传递对应于它的键的值,并且 Foo.bar 字典被更新和排序。

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

问题是,当在 XMLRPCController 中调用带有大量记录(比如 100K +)的更新函数时,它会停止响应我的 getData 调用,直到所有 32 个 Foo 实例都完成了 process_data 方法。我认为 deferToThread 会起作用,但我认为我误解了问题所在。

任何建议......如果它支持这种必需的行为,我愿意使用其他东西,比如 Cherrypy。


编辑

@Troy:这就是反应堆的设置方式

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

就 GIL 而言,将 sys.setcheckinterval() 值更改为更小的值是否是一个可行的选择,以便释放对数据的锁定以便可以读取它?

4

2 回答 2

1

让应用程序响应的最简单方法是将 CPU 密集型处理分解成更小的块,同时让扭曲的反应器在其间运行。例如通过调用 reactor.callLater(0, process_next_chunk) 前进到下一个块。自己有效地实现协同多任务。

另一种方法是使用单独的进程来完成工作,然后您将受益于多个内核。看看 Ampoule:https : //launchpad.net/ampoule 提供了一个类似于 deferToThread 的 API。

于 2010-02-04T20:23:33.027 回答
0

我不知道您的 processData 方法运行了多长时间,也不知道您是如何设置扭曲反应器的。 默认情况下,twisted reactor 有一个 0 到 10 个线程的线程池。您可能试图将多达 32 个长时间运行的计算推迟到多达 10 个线程。这是次优的。

您还需要询问 GIL 在更新所有这些集合中扮演什么角色。

编辑:在对程序进行任何重大更改(例如调用sys.setcheckinterval())之前,您可能应该使用分析器或 python 跟踪模块运行它。这些应该告诉您一直在使用哪些方法。没有正确的信息,您就无法做出正确的更改。

于 2010-02-04T19:06:51.283 回答