7

我有一个大的 XML 数据文件(>160M)要处理,看起来 SAX/expat/pulldom 解析是要走的路。我想要一个线程来筛选节点并将要处理的节点推送到队列中,然后其他工作线程将下一个可用节点从队列中拉出并处理它。

我有以下内容(它应该有锁,我知道 - 稍后会有)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

问题是while块的主体只被调用一次,然后我什至不能 ctrl-C 中断它。在较小的文件上,输出符合预期,但这似乎表明处理程序仅在文档完全解析时才被调用,这似乎违背了 SAX 解析器的目的。

我确定这是我自己的无知,但我不明白我在哪里犯了错误。

PS:我也尝试过start_handler这样的改变:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

然而,没有爱。

4

4 回答 4

8

ParseFile,正如您所注意到的,只是“吞下”所有内容——这对您想要做的增量解析没有好处!因此,只需一次将文件提供给解析器,确保在执行过程中有条件地将控制权交给其他线程——例如:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)

time.sleep(0.0)调用是 Python 的方式,表示“如果有其他线程准备就绪并等待,则让给其他线程”;该Parse方法记录在这里

第二点是,忘记这种用法的锁!-- 使用Queue.Queue代替,它本质上是线程安全的,并且几乎总是在 Python 中协调多个线程的最佳和最简单的方法。只需在其上创建一个Queue实例qq.put(name)并让工作线程阻塞q.get()等待获得更多工作要做 - 太简单了!

(您可以使用几种辅助策略来协调工作线程的终止,当它们没有更多工作要做时,但最简单的,没有特殊要求的,就是让它们成为守护线程,所以它们都会在主线程时终止线程确实 - 请参阅文档)。

于 2010-01-19T00:51:53.843 回答
8

我不太确定这个问题。我猜对 ParseFile 的调用是阻塞的,并且由于 GIL,只有解析线程正在运行。解决此问题的方法是multiprocessing改用。无论如何,它旨在与队列一起使用。

您制作 aProcess并且可以将其传递 a Queue

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

我已经包含了一个元素列表,只是为了复制您的原始脚本。您的最终解决方案可能会使用get_nowait和 aPool或类似的东西。

于 2010-01-19T00:35:06.743 回答
1

我看到的唯一错误是您正在q从不同的线程同时访问-确实在您编写时没有锁定。这是自找麻烦——你可能会遇到麻烦,因为 Python 解释器会锁定你。:)

试试加锁,真的不是很困难:

import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

你看,这真的很简单,我们只是创建了一个锁变量来保护我们的对象,并在每次使用该对象之前获取该锁,并在每次完成对该对象的任务后释放该锁。这样我们保证q.append(name)永远不会与print(q).


(对于较新版本的 Python,还有一种“with ....”语法可以帮助您不释放锁或关闭文件或其他经常忘记的清理。)

于 2010-01-19T00:31:35.543 回答
0

我对实现不是很了解,但是如果解析是一直执行到完成的 C 代码,其他 Python 线程将不会运行。如果解析器回调 Python 代码,GIL 可能会被释放以供其他线程运行,但我不确定。您可能需要检查这些详细信息。

于 2010-01-19T00:37:38.203 回答