3

我有一个具有这种格式的几个 GB 的文本文件

0 274 593869.99 6734999.96 121.83 1,
0 273 593869.51 6734999.92 121.57 1,
0 273 593869.15 6734999.89 121.57 1,
0 273 593868.79 6734999.86 121.65 1,
0 273 593868.44 6734999.84 121.65 1,
0 273 593869.00 6734999.94 124.21 1,
0 273 593868.68 6734999.92 124.32 1,
0 273 593868.39 6734999.90 124.44 1,
0 273 593866.94 6734999.71 121.37 1,
0 273 593868.73 6734999.99 127.28 1,

我有一个简单的函数可以在 Windows 上的 Python 2.7 中进行过滤。该函数读取整个文件,选择相同的行idtile(第一列和第二列)并返回点列表(x、y、z 和标签)和idtile.

tiles_id = [j for j in np.ndindex(ny, nx)] #ny = number of row, nx= number of columns
idtile = tiles_id[0]

def file_filter(name,idtile):
        lst = []
        for line in file(name, mode="r"):
            element = line.split() # add value
            if (int(element[0]),int(element[1])) == idtile:
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
        return(lst, dy, dx)

该文件超过 32 GB,瓶颈是文件的读取。我正在寻找一些建议或示例以加快我的功能(例如:并行计算或其他方法)。

我的解决方案是将文本文件拆分为图块(使用 x 和 y 位置)。该解决方案并不优雅,我正在寻找一种有效的方法。

4

10 回答 10

1

速度的主要规则:少做。

  • 您可以创建一个排序版本huge.txt并跟踪idtitle. 因此,如果您搜索 (223872, 23239),您已经知道给定信息在文件中的哪个位置,并且可以跳过 ( file.seek) 之前的所有内容。有人可能会争辩说,此信息在某种程度上等于'INDEX'
  • 您可以使用mmap将文件映射到内存中。
  • 您可以开始编写“工人”来处理不同的文件/职位
  • 您可以将给定的文件传输到 SQL 服务器之类的东西中,并使用标准 SQL 来检索数据。是的,将 32gig 的数据传输到数据库需要时间,但可以将其视为一种预处理。之后,数据库将使用任何方式比您的方法更快地访问数据。

小想法:

  • 您可以使用切片而不是line.split()避免大量微小的内存分配。

如何使用数据库的概述:

假设你有类似的东西PostgreSQL

CREATE TABLE tiles
(
  tile_x integer,
  tile_y integer,
  x double precision,
  y double precision,
  z double precision,
  flag integer
);

然后,您可以将输入文件中的所有空格替换为|和所有的,以创建一个 nice 和 shiney .csv)并将其直接输入数据库:

 COPY "tiles" from "\full\path\to\huge.txt" WITH DELIMITER "|";

然后你可以做这样的花哨的事情:

SELECT * FROM "tiles";

tile_x | tile_y |     x     |     y      |   z    | flag
-------+--------+-----------+------------+--------+-----
     0 |    274 | 593869.99 | 6734999.96 | 121.83 |    1
     0 |    273 | 593869.51 | 6734999.92 | 121.57 |    1
     0 |    273 | 593869.15 | 6734999.89 | 121.57 |    1
     0 |    273 | 593868.79 | 6734999.86 | 121.65 |    1
     0 |    273 | 593868.44 | 6734999.84 | 121.65 |    1
     0 |    273 |    593869 | 6734999.94 | 124.21 |    1
     0 |    273 | 593868.68 | 6734999.92 | 124.32 |    1
     0 |    273 | 593868.39 |  6734999.9 | 124.44 |    1
     0 |    273 | 593866.94 | 6734999.71 | 121.37 |    1
     0 |    273 | 593868.73 | 6734999.99 | 127.28 |    1 

或者是这样的:

SELECT * FROM "tiles" WHERE tile_y > 273;

tile_x | tile_y |     x     |     y      |   z    | flag
-------+--------+-----------+------------+--------+-----
     0 |    274 | 593869.99 | 6734999.96 | 121.83 |    1
于 2013-03-05T12:52:05.503 回答
1

也许最好和最快的方法是在(大规模)并行系统上使用 map reduce 算法来解决您的问题。

于 2013-03-06T07:39:22.677 回答
1

您可以通过进行字符串比较来避免在每一行上执行split()and :int()

def file_filter(name,idtile):
    lst = []
    id_str = "%d %d " % idtile
    with open(name) as f:
        for line in f:
            if line.startswith(id_str):
                element = line.split() # add value
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
    return(lst, dy, dx)
于 2013-03-05T13:37:02.283 回答
1

我建议您更改代码,以便您读取一次大文件并为每个磁贴 id 编写(临时)文件。就像是:

def create_files(name, idtiles=None):
    files = {}
    for line in open(name):
         elements = line.split()
         idtile = (int(elements[0]), int(elements[1]))
         if idtiles is not None and idtile not in idtiles:
             continue
         if idtile not in files:
             files[idtile] = open("tempfile_{}_{}".format(elements[0], elements[1]), "w")
         print >>files[idtile], line
    for f in files.itervalues():
        f.close()
    return files

create_files()将返回一个{(tilex, tiley): fileobject}字典。

一种在写入每一行后关闭文件的变体,以解决“打开的文件过多”错误。此变体返回一个{(tilex, tiley: filename}字典。应该会慢一些。

def create_files(name, idtiles=None):
    files = {}
    for line in open(name):
         elements = line.split()
         idtile = (int(elements[0]), int(elements[1]))
         if idtiles is not None and idtile not in idtiles:
             continue
         filename = "tempfile_{}_{}".format(elements[0], elements[1])
         files[idtile] = filename
         with open(filename, "a") as f:
             print >>f, line
    return files
于 2013-03-05T12:39:31.123 回答
1

你的'idtile'似乎有一定的顺序。也就是说,样本数据表明,一旦您遍历某个“idtile”并点击下一个,则不可能再次出现带有该“idtile”的行。如果是这种情况,您可能会在for处理完您想要的“idtile”并点击不同的“idtile”后打破循环。在我的头顶上:

loopkiller = false
for line in file(name, mode="r"):
    element = line.split()
    if (int(element[0]),int(element[1])) == idtile:
        lst.append(element[2:])
        dy, dx = int(element[0]),int(element[1])
        loopkiller = true
    elif loopkiller:
        break;

这样,一旦你完成了某个“idtile”,你就会停下来;而在您的示例中,您继续阅读直到文件末尾。

如果您的 idtiles 以随机顺序出现,也许您可​​以尝试先编写文件的有序版本。

此外,单独评估您的 idtile 的数字可能会帮助您更快地遍历文件。假设您idtile是一个两位数和三位整数的二元组,可能类似于以下内容:

for line in file(name, mode="r"):
    element = line.split()
    if int(element[0][0]) == idtile[0]:
        if element[1][0] == str(idtile[1])[0]:
            if element[1][1] == str(idtile[1])[1]:
                if element[1][2] == str(idtile[1])[2]:
                    dy, dx = int(element[0]),int(element[1])
                else go_forward(walk)
            else go_forward(run)
         else go_forward(sprint)
     else go_forward(warp)
于 2013-03-05T12:41:35.957 回答
1

这是一些统计数据。随着更多解决方案的出现,我将对其进行更新。以下程序适用于包含问题中重复行的文件。

import sys

def f0(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            pass


"""def f0(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            line.split()"""


def f1(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            element = line.split() # add value
            if (int(element[0]),int(element[1])) == idtile:
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
    return(lst, dy, dx)


def f2(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            element = line.split() # add value
            pdx, pdy = int(element[0]),int(element[1])
            if (pdx, pdy) == idtile:
                lst.append(element[2:])
                dy, dx = pdx, pdy
    return(lst, dy, dx)

def f2(name, idtile):
    lst = []
    dx, dy = None, None
    with open(name) as f:
        for line in f:
            element = line.split() # add value
            pdx, pdy = int(element[0]),int(element[1])
            if (pdx, pdy) == idtile:
                lst.append(element[2:])
                dy, dx = pdx, pdy
    return(lst, dy, dx)


def f3(name,idtile):
    lst = []
    id_str = "%d %d " % idtile
    with open(name) as f:
        for line in f:
            if line.startswith(id_str):
                element = line.split() # add value
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
    return(lst, dy, dx)

functions = [f0, f1, f2, f3]

functions[int(sys.argv[1])]("./srcdata.txt",(0, 273))

计时的 shell 脚本很简单:

#!/bin/sh
time python ./timing.py 0
time python ./timing.py 1
time python ./timing.py 2

我更喜欢以这种方式运行它,以避免以前运行的函数影响其他函数的时间。

结果是:

0.02user 0.01system 0:00.03elapsed
0.42user 0.01system 0:00.44elapsed
0.32user 0.02system 0:00.34elapsed
0.33user 0.01system 0:00.35elapsed

好消息:读取文件不是瓶颈消除对 int 的额外传输是有效的

坏消息:我现在仍然不知道如何显着优化它。

于 2013-03-05T14:47:36.400 回答
1

我的解决方案是将大文本文件拆分为每个 idtile 的许多小二进制文件。为了更快地阅读文本文件,您可以使用 pandas:

import pandas as pd
import numpy as np
n = 400000 # read n rows as one block
for df in pd.read_table(large_text_file, sep=" ", comment=",", header=None, chunksize=n):
    for key, g in df.groupby([0, 1]):
        fn = "%d_%d.tmp" % key
            with open(fn, "ab") as f:
                data = g.ix[:, 2:5].values
                data.tofile(f)

然后您可以通过以下方式获取一个二进制文件的内容:

np.fromfile("0_273.tmp").reshape(-1, 4)
于 2013-03-05T13:30:20.550 回答
1

您可以将过滤器转换为生成器函数:

def file_filter(name):
        lst = []
        idtile = None
        for line in file(name, mode="r"):
            element = line.split() # add value
            if idtile is None:
               idtile = (int(element[0]), int(element[1]))
            if (int(element[0]), int(element[1])) == idtile:
                lst.append(element[2:])
                dy, dx = int(element[0]),int(element[1])
            else:
                yield lst, dx, dy
                lst = []
                idtile = None

如果文件在 idtile 上排序,此函数应list_of_data, dx, dy为每个 idtile返回一个元组

新的你可以像这样使用它:

for lst, dx, dy in file_filter('you_name_it'):
    process_tile_data(lst, dx, dy)
于 2013-03-05T12:56:08.497 回答
1

我建议比较一下完整阅读过程所用的时间,以及只阅读线条而不对它们做任何事情的时间。如果这些时间很接近,您唯一能做的就是改变方法(拆分文件等),因为您可能可以优化的是数据处理时间,而不是文件读取时间。

我还在您的代码中看到了两个值得修复的时刻:

with open(name) as f:
    for line in f:
        pass #Here goes the loop body
  1. 用于with显式关闭您的文件。您的解决方案应该在 CPython 中运行,但这取决于实现,并且可能并不总是那么有效。

  2. 您将字符串转换为int两次。这是一个相对缓慢的操作。通过重用结果删除第二个。

PS它看起来像地球表面上一组点的深度或高度值数组,并且表面被分割成瓷砖。:-)

于 2013-03-05T12:59:45.090 回答
0

好的。如果您需要多次执行此操作,您显然需要创建某种索引。但是,如果这不是一个频繁的活动,最好的选择是像这样多线程。

NUMWORKERS = 8
workerlist = []
workQ = Queue.Queue()

def file_filter(name,idtile, numworkers):
    for i in range(NUMWORKERS):
        worker = threading.Thread(target=workerThread, args=(lst,))
    lst = []
    for line in file(name, mode="r"):
        workQ.put(line)                
    for i in range(NUMWORKERS):
        workQ.put(None)
    workQ.join()
    return lst , idtile[0], idtile[1]

def workerThread(lst):
    line = workQ.get()
    if not line:
        return
    element = line.split() # add value
    if (int(element[0]),int(element[1])) == idtile:
        lst.append(element[2:])

如果这是每个idtile都发生的非常频繁的活动,那么解决方案将大不相同。对多个 idtile 一起执行此操作将为您提供最佳的摊销性能。因为任何已知数量的 idtile 都可以在文件的单个循环中处理。

于 2013-03-05T13:19:08.357 回答