5

我有一个包含大量数据的文件。每一行都是一条记录。我正在尝试对整个文件进行一些 ETL 工作。现在我正在使用标准输入逐行读取数据。很酷的一点是,您的脚本可以非常灵活地与其他脚本和 shell 命令集成。我将结果写入标准输出。例如。

$ cat input_file
line1 
line2
line3
line4
...

我当前的 python 代码看起来像这样 - parse.py

import sys
for line in sys.stdin:
    result = ETL(line)    # ETL is some self defined function which takes a while to execute.
    print result

下面的代码是它现在的工作方式:

cat input_file | python parse.py > output_file

我查看了 Python 的 Threading 模块,我想知道如果我使用该模块,性能是否会显着提高。

问题1:我应该如何规划每个线程的配额,为什么?

...
counter = 0
buffer = []
for line in sys.stdin:
    buffer.append(line)
    if counter % 5 == 0:   # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine
        counter = 0
        thread = parser(buffer)
        buffer = []
        thread.start() 

问题2:多个线程可能同时将结果打印回stdout,如何组织,避免出现以下情况?

import threading
import time

class parser(threading.Thread):
    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            print elem + 'Finished'

work = ['a', 'b', 'c', 'd', 'e', 'f']

thread1 = parser(['a', 'b'])  
thread2 = parser(['c', 'd'])
thread3 = parser(['e', 'f'])

thread1.start()
thread2.start()
thread3.start()   

输出真的很难看,其中一行包含来自两个线程的输出。

aFinished
cFinishedeFinished

bFinished
fFinished
dFinished
4

2 回答 2

5

首先提出第二个问题,这就是互斥锁的用途。您可以通过使用锁在解析器之间进行协调来获得所需的更清晰的输出,并确保在给定的时间段内只有一个线程可以访问输出流:

class parser(threading.Thread):
    output_lock = threading.Lock()

    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            with self.output_lock:
                print elem + 'Finished'

关于您的第一个问题,请注意,多线程可能不会为您的特定工作负载带来任何好处。这在很大程度上取决于您对每条输入线(您的ETL函数)所做的工作是否主要受 CPU 限制或 IO 限制。如果是前者(我怀疑很可能),线程将无济于事,因为全局解释器 lock。在这种情况下,您可能希望使用该multiprocessing模块在多个进程而不是多个线程之间分配工作。

但是您可以通过更容易实现的工作流程获得相同的结果:将输入文件拆分为多个n部分(例如,使用split命令);在每个子文件上分别调用提取和转换脚本;然后连接生成的输出文件。

一个挑剔:“使用标准输入逐行读取数据,因为它不会将整个文件加载到内存中”涉及一种误解。您可以在 Python 中逐行读取文件,例如,用sys.stdin以下构造中的文件对象替换:

for line in sys.stdin:

另请参阅readline()文件对象的方法,并注意read()可以将要读取的最大字节数作为参数。

于 2013-08-21T06:03:24.610 回答
1

Whether threading will be helpful you is highly dependent on on your situation. In particular, if your ETL() function involves a lot of disk access, then threading would likely give you pretty significant speed improvement.

In response to your first question, I've always found that it just depends. There are a lot of factors at play when determining the ideal number of threads, and many of them are program-dependent. If you're doing a lot of disk access (which is pretty slow), for example, then you'll want more threads to take advantage of the downtime while waiting for disk access. If the program is CPU-bound, though, tons of threads may not be super helpful. So, while it may be possible to analyze all the factors to come up with an ideal number of threads, it's usually a lot faster to make an initial guess and then adjust from there.

More specifically, though, assigning a certain number of lines to each thread probably isn't the best way to go about divvying up the work. Consider, for example, if one line takes a particularly long time to process. It would be best if one thread could work away at that one line and the other threads could each do a few more lines in the meantime. The best way to handle this is to use a Queue. If you push each line into a Queue, then each thread can pull a line off the Queue, handle it, and repeat until the Queue is empty. This way, the work gets distributed such that no thread is ever without work to do (until the end, of course).

Now, the second question. You're definitely right that writing to stdout from multiple threads at once isn't an ideal solution. Ideally, you would arrange things so that the writing to stdout happens in only one place. One great way to do that is to use a Queue. If you have each thread write its output to a shared Queue, then you can spawn an additional thread whose sole task is to pull items out of that Queue and print them to stdout. By restricting the printing to just one threading, you'll avoid the issues inherent in multiple threads trying to print at once.

于 2013-08-21T06:07:55.130 回答