我通常使用约 20 Gb 大小的文本文件,我发现自己经常计算给定文件中的行数。
我现在做的方式只是cat fname | wc -l
,而且需要很长时间。有没有更快的解决方案?
我在安装了 Hadoop 的高性能集群中工作。我想知道 map reduce 方法是否有帮助。
我希望解决方案像解决方案一样简单wc -l
,但不确定它的可行性。
有任何想法吗?
尝试:sed -n '$=' filename
cat 也是不必要的:wc -l filename
以您目前的方式就足够了。
您的限制速度因素是存储设备的 I/O 速度,因此在简单的换行符/模式计数程序之间进行更改将无济于事,因为这些程序之间的执行速度差异可能会被较慢的磁盘/存储/不管你有什么。
但是,如果您在磁盘/设备之间复制了相同的文件,或者文件分布在这些磁盘之间,您当然可以并行执行该操作。我不具体了解这个 Hadoop,但假设您可以从 4 个不同位置读取 10gb 文件,您可以运行 4 个不同的行计数过程,每个过程都在文件的一部分中,并将它们的结果总结起来:
$ dd bs=4k count=655360 if=/path/to/copy/on/disk/1/file | wc -l &
$ dd bs=4k skip=655360 count=655360 if=/path/to/copy/on/disk/2/file | wc -l &
$ dd bs=4k skip=1310720 count=655360 if=/path/to/copy/on/disk/3/file | wc -l &
$ dd bs=4k skip=1966080 if=/path/to/copy/on/disk/4/file | wc -l &
注意&
在每个命令行,所以所有将并行运行;dd
像cat
这里一样工作,但允许我们指定要读取多少字节(count * bs
字节)以及在输入开头跳过多少字节(skip * bs
字节)。它以块为单位工作,因此需要指定bs
为块大小。在此示例中,我已将 10Gb 文件划分为 4 个相等的 4Kb * 655360 = 2684354560 字节 = 2.5GB 的块,为每个作业分配一个,您可能需要根据文件的大小设置一个脚本来为您执行此操作文件和您将运行的并行作业的数量。您还需要总结执行的结果,因为我缺乏 shell 脚本能力而没有做的事情。
如果您的文件系统足够智能,可以在许多设备(如 RAID 或分布式文件系统或其他设备)之间拆分大文件,并自动并行化可以并行化的 I/O 请求,您可以进行这样的拆分,运行许多并行作业,但使用相同的文件路径,你仍然可能有一些速度增益。
编辑:我想到的另一个想法是,如果文件中的行大小相同,则可以通过将文件大小除以行大小来获得确切的行数,均以字节为单位。您几乎可以在一项工作中立即完成。如果您有平均大小并且不完全关心行数,但想要估计,您可以执行相同的操作并比精确操作更快地获得令人满意的结果。
根据我的测试,我可以验证 Spark-Shell(基于 Scala)比其他工具(GREP、SED、AWK、PERL、WC)快得多。这是我在一个有 23782409 行的文件上运行的测试结果
time grep -c $ my_file.txt;
真实 0m44.96s 用户 0m41.59s 系统 0m3.09s
time wc -l my_file.txt;
真实 0m37.57s 用户 0m33.48s 系统 0m3.97s
time sed -n '$=' my_file.txt;
真实 0m38.22s 用户 0m28.05s 系统 0m10.14s
time perl -ne 'END { $_=$.;if(!/^[0-9]+$/){$_=0;};print "$_" }' my_file.txt
;
真实 0m23.38s 用户 0m20.19s 系统 0m3.11s
time awk 'END { print NR }' my_file.txt;
真实 0m19.90s 用户 0m16.76s 系统 0m3.12s
spark-shell
import org.joda.time._
val t_start = DateTime.now()
sc.textFile("file://my_file.txt").count()
val t_end = DateTime.now()
new Period(t_start, t_end).toStandardSeconds()
res1: org.joda.time.Seconds = PT15S
在多核服务器上,使用GNU parallel来并行计算文件行数。打印每个文件的行数后, bc 将所有行数相加。
find . -name '*.txt' | parallel 'wc -l {}' 2>/dev/null | paste -sd+ - | bc
为了节省空间,您甚至可以压缩所有文件。以下行解压缩每个文件并并行计算其行数,然后对所有计数求和。
find . -name '*.xz' | parallel 'xzcat {} | wc -l' 2>/dev/null | paste -sd+ - | bc
如果您的数据位于 HDFS 上,那么最快的方法可能是使用 hadoop 流。Apache Pig 的 COUNT UDF 对包进行操作,因此使用单个 reducer 来计算行数。相反,您可以在简单的 hadoop 流脚本中手动设置 reducer 的数量,如下所示:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar -Dmapred.reduce.tasks=100 -input <input_path> -output <output_path> -mapper /bin/cat -reducer "wc -l"
请注意,我手动将减速器的数量设置为 100,但您可以调整此参数。一旦 map-reduce 作业完成,每个 reducer 的结果将存储在单独的文件中。最终的行数是所有 reducer 返回的数字的总和。您可以获得最终的行数,如下所示:
$HADOOP_HOME/bin/hadoop fs -cat <output_path>/* | paste -sd+ | bc
我知道这个问题现在已经有几年了,但是扩展了Ivella 的最后一个想法,这个 bash 脚本通过测量一行的大小并从中推断,在几秒钟或更短的时间内估计一个大文件的行数:
#!/bin/bash
head -2 $1 | tail -1 > $1_oneline
filesize=$(du -b $1 | cut -f -1)
linesize=$(du -b $1_oneline | cut -f -1)
rm $1_oneline
echo $(expr $filesize / $linesize)
如果将此脚本命名为lines.sh
,则可以调用lines.sh bigfile.txt
以获取估计的行数。在我的情况下(大约 6 GB,从数据库导出),与真实行数的偏差仅为 3%,但运行速度快了大约 1000 倍。顺便说一句,我使用第二行而不是第一行作为基础,因为第一行有列名,而实际数据从第二行开始。
Hadoop 本质上提供了一种机制来执行类似于@Ivella 所建议的事情。
Hadoop 的 HDFS(分布式文件系统)将获取您的 20GB 文件并将其以固定大小的块的形式保存在集群中。假设您将块大小配置为 128MB,文件将被拆分为 20x8x128MB 块。
然后,您将对这些数据运行 map reduce 程序,基本上计算每个块的行数(在 map 阶段),然后将这些块行数减少为整个文件的最终行数。
至于性能,通常集群越大,性能就越好(更多的 wc 并行运行,在更多独立的磁盘上),但是作业编排有一些开销,这意味着在较小的文件上运行作业实际上不会更快地产生吞吐量比运行本地 wc
我不确定python是否更快:
[root@myserver scripts]# time python -c "print len(open('mybigfile.txt').read().split('\n'))"
644306
real 0m0.310s
user 0m0.176s
sys 0m0.132s
[root@myserver scripts]# time cat mybigfile.txt | wc -l
644305
real 0m0.048s
user 0m0.017s
sys 0m0.074s
如果您的瓶颈是磁盘,那么您如何读取它很重要。dd if=filename bs=128M | wc -l
比我的具有 HDD 和快速 CPU 和 RAM 的机器要快得多。您可以尝试使用块大小并查看吞吐量报告。我把它调高到 1GiB。wc -l filename
cat filename | wc -l
dd
注意:关于是否更快有一些cat
争论dd
。我所声称的是,这dd
可以更快,这取决于系统,而且它适合我。自己试试吧。
如果你的电脑有 python,你可以从 shell 试试这个:
python -c "print len(open('test.txt').read().split('\n'))"
这用于python -c
传入一个命令,该命令基本上是读取文件,并由“换行符”分割,以获取换行符的数量或文件的总长度。
bash-3.2$ sed -n '$=' test.txt
519
使用上述:
bash-3.2$ python -c "print len(open('test.txt').read().split('\n'))"
519
find -type f -name "filepattern_2015_07_*.txt" -exec ls -1 {} \; | cat | awk '//{ print $0 , system("cat " $0 "|" "wc -l")}'
输出:
我有一个 645GB 的文本文件,早期的精确解决方案(例如wc -l
)都没有在 5 分钟内返回答案。
相反,这是一个 Python 脚本,它计算一个大文件中的大致行数。(我的文本文件显然有大约 55 亿行。)Python 脚本执行以下操作:
A. 计算文件中的字节数。
B. 读取文件中的第一N
行(作为示例)并计算平均行长度。
C. 将 A/B 计算为近似的行数。
它遵循Nico 的 answer,但不是取一行的长度,而是计算第一N
行的平均长度。
注意:我假设一个 ASCII 文本文件,所以我希望 Pythonlen()
函数将字符数作为字节数返回。
将此代码放入文件中line_length.py
:
#!/usr/bin/env python
# Usage:
# python line_length.py <filename> <N>
import os
import sys
import numpy as np
if __name__ == '__main__':
file_name = sys.argv[1]
N = int(sys.argv[2]) # Number of first lines to use as sample.
file_length_in_bytes = os.path.getsize(file_name)
lengths = [] # Accumulate line lengths.
num_lines = 0
with open(file_name) as f:
for line in f:
num_lines += 1
if num_lines > N:
break
lengths.append(len(line))
arr = np.array(lengths)
lines_count = len(arr)
line_length_mean = np.mean(arr)
line_length_std = np.std(arr)
line_count_mean = file_length_in_bytes / line_length_mean
print('File has %d bytes.' % (file_length_in_bytes))
print('%.2f mean bytes per line (%.2f std)' % (line_length_mean, line_length_std))
print('Approximately %d lines' % (line_count_mean))
N
使用=5000像这样调用它。
% python line_length.py big_file.txt 5000
File has 645620992933 bytes.
116.34 mean bytes per line (42.11 std)
Approximately 5549547119 lines
所以文件中有大约 55 亿行。
在收集数据以供 wc 处理时,较慢的 IO 回退到dd if={file} bs=128M | wc -l
极大地帮助。
我也偶然发现
https://github.com/crioux/turbo-linecount
这很棒。
让我们假设:
那么你真的想将文件分成几部分,在多个节点上并行计算部分并总结那里的结果(这基本上是@Chris White 的想法)。
以下是使用 GNU Parallel(版本 > 20161222)的方法。您需要列出其中的节点,~/.parallel/my_cluster_hosts
并且您必须有权ssh
访问所有节点:
parwc() {
# Usage:
# parwc -l file
# Give one chunck per host
chunks=$(cat ~/.parallel/my_cluster_hosts|wc -l)
# Build commands that take a chunk each and do 'wc' on that
# ("map")
parallel -j $chunks --block -1 --pipepart -a "$2" -vv --dryrun wc "$1" |
# For each command
# log into a cluster host
# cd to current working dir
# execute the command
parallel -j0 --slf my_cluster_hosts --wd . |
# Sum up the number of lines
# ("reduce")
perl -ne '$sum += $_; END { print $sum,"\n" }'
}
用于:
parwc -l myfile
parwc -w myfile
parwc -c myfile