数据生成
我使用这个 Perl 脚本创建了 10 个文件,每个文件有 300,000 行,每行包含 11 个字符(主要是数字)和一个换行符:
#!/usr/bin/env perl
use strict;
use warnings;
# Generate sorted, merge-worthy data
die "Usage: num-files lines-per-file" unless scalar(@ARGV) == 2;
my $num_files = $ARGV[0];
my $num_lines = $ARGV[1];
die "Enter a number of files between 2 and 999" unless $num_files >= 2 && $num_files <= 999;
die "Enter a number of lines between 2 and 9,999,999" unless $num_files >= 2 && $num_files <= 9_999_999;
my $base = "datafile";
my $digits = length($num_files . '');
my $format = "$base.%.${digits}d";
foreach my $i (1..$num_files)
{
my $file = sprintf $format, $i;
open my $fh, '>', $file or die "Failed to create $file";
foreach my $n (1..$num_lines)
{
printf $fh "%.7d-%.3d\n", $n + 60 * $i, $i;
}
close $fh;
}
Perl 5.18.0 花了大约 1.5 秒来生成 10 个文件(在 MacBook Pro 上,2.3 GHz Intel Core i7,16 GiB 1333 MHz RAM,老式旋转磁盘;有用,但不是非常强大)。数据的设计使得文件之间存在重叠,而且每一行都是唯一的,并标识它来自的文件(这就是将$offset
文件中的序列号放在文件号之前的目的)。例如,在测试 3 个文件时,在文件 1 和 2 中的数据交错之前,只有文件 1 中的一些数据,然后来自所有 3 个文件,然后文件 1 用完,文件 2 用完。它很好地覆盖了不同的文件合并算法的一部分(但您可以创建更多场景)。
然后我创建了一个merge
程序,如下面的“代码”部分所示。没有什么很花哨的。它从 Jon Bentley 的“More Programming Pearls”中提取了一些堆管理算法;原件作为评论引用。唯一棘手的一点是比较例程的意义:这导致我一开始就出现了一些错误的答案,以及间接级别。
计时结果
在代表约 35 MiB 数据的 10 个文件上运行时:
$ ls -l datafile.??
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.01
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.02
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.03
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.04
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.05
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.06
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.07
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.08
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.09
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 20:37 datafile.10
$
$ for file in datafile.??; do echo $file; sort -c $file; done
datafile.01
datafile.02
datafile.03
datafile.04
datafile.05
datafile.06
datafile.07
datafile.08
datafile.09
datafile.10
$ time ./merge datafile.?? > datafile.out
real 0m0.510s
user 0m0.314s
sys 0m0.074s
$ time ./merge datafile.?? > datafile.out
real 0m0.513s
user 0m0.317s
sys 0m0.080s
$ time sort -m datafile.?? > datafile.merge
real 0m10.061s
user 0m9.960s
sys 0m0.083s
$ time sort -m datafile.?? > datafile.merge
real 0m10.039s
user 0m9.921s
sys 0m0.082s
$ ls -l datafile.*
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.01
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.02
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.03
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.04
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.05
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.06
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.07
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.08
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 14:09 datafile.09
-rw-r--r-- 1 jleffler staff 3600000 Sep 15 20:37 datafile.10
-rw-r--r-- 1 jleffler staff 36000000 Sep 15 20:42 datafile.merge
-rw-r--r-- 1 jleffler staff 36000000 Sep 15 20:41 datafile.out
$ cmp datafile.out datafile.merge
$ sort -c datafile.out
$
解释这些结果:
- 第一个
ls
清单显示了 10 个数据文件。
for
循环检查输入文件是否单独按排序顺序。
- 我编写的
merge
程序运行时间约为 0.5 秒。
sort
合并模式下的系统-m
(
- 第二个
ls
清单显示输出文件的大小相同。
- 该
cmp
命令显示输出文件是相同的。
- 该
sort -c
命令显示输出文件已排序。
我分别并随后在 52 秒内创建了 101 个文件,每个文件有 1,000,000 条记录,每个记录 12 字节。我在 20 秒内合并了文件(生成大约 1179 MiB 的输出文件 — 1,236,000,000 字节)。系统sort
在 467 (7m47s) 秒内将它们合并(又名“永远”)。sort -c
检查输出文件是否按排序顺序花费了大约 90 秒。比较两个大文件只用了cmp
不到 2 秒的时间。
我很好奇系统sort
在合并时如此缓慢。
结果解释
- 在我的 Mac 上,可以在大约半秒内从 10 个输入文件中合并约 35 MiB 的数据。
- 系统
sort
可以在大约十秒钟内完成这项工作。
- 通过推断(假设我的机器不再是火箭,如果它曾经是的话),应该可以在不到 20 秒的时间内在 Windows 机器上合并约 35 MiB(允许您在性能上产生 40 倍的差异,我不这样做) '认为你不应该需要)。
- 因此,您的代码花费 30 秒时间太长了——问题是“为什么”。你应该仔细研究你的算法和数据结构。
代码
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct File
{
const char *file;
FILE *fp;
char *line;
size_t reserve; /* Space allocated for line */
size_t length; /* Space used by current line */
} File;
extern void err_exit(const char *fmt, ...);
extern void read_line(File *current);
extern void write_line(File *current);
extern void heapify(size_t *heap, size_t heap_size, File *inputs);
extern void siftup(size_t *heap, size_t lo, size_t hi, File *inputs);
extern void siftdown(size_t *heap, size_t lo, size_t hi, File *inputs);
extern int compare(File *inputs, size_t i1, size_t i2);
const char *arg0;
int main(int argc, char **argv)
{
File inputs[argc];
arg0 = argv[0];
/* Open files for reading - load first lines */
for (int i = 1; i < argc; i++)
{
File *current = &inputs[i];
current->file = argv[i];
if ((current->fp = fopen(current->file, "r")) == 0)
err_exit("failed to open file %s for reading", current->file);
current->reserve = 128;
if ((current->line = malloc(current->reserve)) == 0)
err_exit("failed to allocate %zu bytes memory", current->reserve);
current->length = 0;
read_line(current);
}
#if defined(CHECK_FUNDAMENTALS)
for (int i = 1; i < argc; i++)
printf("%d: %zu - %s\n", i, inputs[i].length, inputs[i].file);
#endif
size_t heap_size = argc - 1;
size_t heap[argc]; // heap[0] unused
for (int i = 1; i < argc; i++)
heap[i] = i;
#if defined(CHECK_FUNDAMENTALS)
printf("Heap before:\n");
for (int i = 1; i < argc; i++)
printf("%d: %zu - %s", i, heap[i], inputs[heap[i]].line);
#endif
heapify(heap, heap_size, inputs);
#if defined(CHECK_FUNDAMENTALS)
printf("Heap after:\n");
for (int i = 1; i < argc; i++)
printf("%d: %zu - %s", i, heap[i], inputs[heap[i]].line);
#endif
#if defined(CHECK_FUNDAMENTALS)
printf("Compare two lines:\n");
printf("1: %s\n", inputs[1].line);
printf("2: %s\n", inputs[2].line);
int r12 = compare(inputs, 1, 2);
int r21 = compare(inputs, 2, 1);
int r11 = compare(inputs, 1, 1);
printf("1 vs 2: %d\n", r12);
printf("2 vs 1: %d\n", r21);
printf("1 vs 1: %d\n", r11);
#endif
while (heap_size > 0)
{
File *current = &inputs[heap[1]];
write_line(current);
read_line(current);
if (current->line == 0)
heap[1] = heap[heap_size--];
if (heap_size > 0)
{
siftdown(heap, 1, heap_size, inputs);
#if defined(CHECK_FUNDAMENTALS)
printf("Heap check:\n");
for (int i = 1; i < argc; i++)
printf("%d: %zu - %s", i, heap[i], inputs[heap[i]].line);
#endif
}
}
return 0;
}
int compare(File *inputs, size_t i1, size_t i2)
{
return strcmp(inputs[i1].line, inputs[i2].line);
}
void heapify(size_t *heap, size_t heap_size, File *inputs)
{
for (size_t i = 1; i <= heap_size; i++)
siftup(heap, 1, i, inputs);
}
/* J Bentley: More Programming Pearls
**
** Heap in array x, indexed from 1, not 0 as is normal in C.
** Implementation will allocate but not use array[0].
**
** function siftup(l, u, i, p) {
** # pre maxheap(l, u-1)
** # post maxheap(l, u)
** i = u
** while (1) {
** # maxheap(l, u) except between i and its parent
** if (i <= l) break
** p = int(i/2)
** if (x[p] >= x[i]) break
** swap(p, i)
** i = p
** }
** }
**
** function siftdown(l, u, i, c) {
** # pre maxheap(l+1, u)
** # post maxheap(l,u)
** i = l
** while (1) {
** # maxheap(l, u) except between i and its children
** c = 2*i
** if (c > u) break
** if (c + 1 <= u && x[c+1] > x[c]) c++
** if (x[i] >= x[c]) break
** swap(c, i)
** i = c
** }
** }
*/
void siftup(size_t *heap, size_t lo, size_t hi, File *inputs)
{
size_t i = hi;
while (1)
{
if (i <= lo)
break;
size_t p = i / 2;
if (compare(inputs, heap[p], heap[i]) <= 0)
break;
size_t t = heap[p];
heap[p] = heap[i];
heap[i] = t;
i = p;
}
}
void siftdown(size_t *heap, size_t lo, size_t hi, File *inputs)
{
size_t i = lo;
while (1)
{
size_t c = 2 * i;
if (c > hi)
break;
if (c + 1 <= hi && compare(inputs, heap[c+1], heap[c]) < 0)
c++;
if (compare(inputs, heap[i], heap[c]) <= 0)
break;
size_t t = heap[c];
heap[c] = heap[i];
heap[i] = t;
i = c;
}
}
void read_line(File *current)
{
char buffer[4096];
if (fgets(buffer, sizeof(buffer), current->fp) != 0)
{
size_t length = strlen(buffer) + 1;
if (length > current->reserve)
{
void *space = realloc(current->line, length);
if (space == 0)
err_exit("failed to reallocate %zu bytes memory", length);
current->line = space;
current->reserve = length;
}
strcpy(current->line, buffer);
current->length = length - 1;
}
else
{
fclose(current->fp);
current->fp = 0;
free(current->line);
current->line = 0;
current->length = 0;
current->reserve = 0;
}
}
void write_line(File *current)
{
if (fwrite(current->line, sizeof(char), current->length, stdout) != current->length)
err_exit("short write of line from %s of length %zu", current->file, current->length);
}
void err_exit(const char *fmt, ...)
{
int errnum = errno;
va_list args;
fprintf(stderr, "%s: ", arg0);
va_start(args, fmt);
vfprintf(stderr, fmt, args);
va_end(args);
if (errnum != 0)
fprintf(stderr, " (%d: %s)", errnum, strerror(errnum));
putc('\n', stderr);
exit(EXIT_FAILURE);
}
该代码旨在处理长达 4 KiB 的行;修改它以使用 POSIX getline()
来处理更长的行并不难。它假设所有文件都可以一次打开(这意味着在大多数机器上的上限约为 250 个输入文件而无需调整限制)。如果它不能打开所有文件而不是对中间文件进行多次合并,它将停止。