我有一个 python 程序,可以顺序解析 30,000 多个文件。
有没有办法可以将其分解为多个线程(这是正确的术语吗?)并同时解析该文件的块。假设有 30 个算法分别解析 1000 个文件。
我有一个 python 程序,可以顺序解析 30,000 多个文件。
有没有办法可以将其分解为多个线程(这是正确的术语吗?)并同时解析该文件的块。假设有 30 个算法分别解析 1000 个文件。
This is easy.
You can create 30 threads explicitly and give each of them 1000 filenames.
But, even simpler, you can create a pool of 30 threads and have them service a thread with 30000 filenames. That gives you automatic load balancing—if some of the files are much bigger than others, you won't have one thread finishing when another one's only 10% done.
The concurrent.futures
module gives you a nice way to execute tasks in parallel (including passing arguments to the tasks and receiving results, or even exceptions if you want). If you're using Python 2.x or 3.1, you will need to install the backport futures
. Then you just do this:
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
results = executor.map(parse_file, filenames)
Now, 30 workers is probably way too many. You'll overwhelm the hard drive and its drivers and end up having most of your threads waiting for the disk to seek. But a small number may be worth doing. And it's ridiculously easy to tweak max_workers
and test the timing and see where the sweet spot is for your system.
If your code is doing more CPU work than I/O work—that is, it spends more time parsing strings and building complicated structures and the like than it does reading from the disk—then threads won't help, at least in CPython, because of the Global Interpreter Lock. But you can solve that by using processes.
From a code point of view, this is trivial: just change ThreadPoolExecutor
to ProcessPoolExecutor
.
However, if you're returning large or complex data structures, the time spent serializing them across the process boundary may eat into, or even overwhelm, your savings. If that's the case, you can sometimes improve things by batching up larger jobs:
def parse_files(filenames):
return [parse_file(filename) for filename in filenames]
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
results = executor.map(parse_files, grouper(10, filenames))
But sometimes you probably need to drop to a lower level and use the multiprocessing
module, which has features like inter-process memory sharing.
If you can't/don't want to use futures
, 2.6+ have multiprocessing.Pool
for a plain processor pool, and a thread pool with the same interface under the name multiprocessing.ThreadPool
(not documented) or multiprocessing.dummy.Pool
(documented but ugly).
In a trivial case like this, there's really no difference between a plain pool and an executor. And, as mentioned above, in very complicated cases, multiprocessing
lets you get under the hood. In the middle, futures
is often simpler. But it's worth learning both.