9

所以我写了一个 Python 程序来处理一个小数据处理任务。

这是我想要的计算的虚构语言的非常简短的规范:

parse "%s %lf %s" aa bb cc | group_by aa | quickselect --key=bb 0:5 | \
    flatten | format "%s %lf %s" aa bb cc

也就是说,对于每一行,解析出一个单词、一个浮点数和另一个单词。将它们视为玩家 ID、分数和日期。我想要每个球员的前五名得分和日期。数据量不小,但也不大;大约 630 兆字节。

我想知道我应该用什么真正的可执行语言来编写它以使其同样简短(如下面的 Python)但要快得多。

#!/usr/bin/python
# -*- coding: utf-8; -*-
import sys

top_5 = {}

for line in sys.stdin:
    aa, bb, cc = line.split()

    # We want the top 5 for each distinct value of aa.  There are
    # hundreds of thousands of values of aa.
    bb = float(bb)
    if aa not in top_5: top_5[aa] = []
    current = top_5[aa]
    current.append((bb, cc))

    # Every once in a while, we drop the values that are not in
    # the top 5, to keep our memory footprint down, because some
    # values of aa have thousands of (bb, cc) pairs.
    if len(current) > 10:
        current.sort()
        current[:-5] = []

for aa in top_5:
    current = top_5[aa]
    current.sort()
    for bb, cc in current[-5:]:
        print aa, bb, cc

这是一些示例输入数据:

3 1.5 a
3 1.6 b
3 0.8 c
3 0.9 d
4 1.2 q
3 1.5 e
3 1.8 f
3 1.9 g

这是我从中得到的输出:

3 1.5 a
3 1.5 e
3 1.6 b
3 1.8 f
3 1.9 g
4 1.2 q

有 7 个值3,因此我们删除cd值,因为它们的bb值将它们排除在前 5 之外。因为4只有一个值,所以它的“前 5”只包含那个值。

这比在 MySQL 中执行相同的查询运行得更快(至少,我们发现执行查询的方式),但我很确定它大部分时间都花在 Python 字节码解释器上。我认为用另一种语言,我可能会让它每秒处理数十万行而不是每分钟。所以我想用一种实现速度更快的语言来编写它。

但我不确定选择什么语言。

我一直无法弄清楚如何在 SQL 中将其表达为单个查询,实际上我对 MySQL 的能力甚至只是 select * from foo into outfile 'bar';输入数据的能力不感兴趣。

C 是一个显而易见的选择,但是诸如line.split()对 2 元组列表进行排序和制作哈希表之类的事情需要编写一些标准库中没有的代码,所以我最终会得到 100 行或更多的代码,而不是 14 行。

C++ 似乎是一个更好的选择(它在标准库中有字符串、映射、对和向量),但似乎使用 STL 的代码会更加混乱。

OCaml 会很好,但它是否具有等效的line.split(),我会对其地图的性能感到难过吗?

Common Lisp 可能有用吗?

是否有类似 Matlab 的数据库计算等价物,可以让我将循环向下推入快速代码?有人试过吗?

(编辑:通过提供一些示例输入和输出数据来回应 davethegr8 的评论,并修复了 Python 程序中的错误!)

(附加编辑:哇,这个评论线程到目前为止真的很棒。谢谢大家!)

编辑:

2007 年在 sbcl-devel 上提出了一个非常相似的问题(感谢 Rainer!),这是awkWill Hartung 用于生成一些测试数据的脚本(尽管它没有真实数据的 Zipfian 分布):

BEGIN {
 for (i = 0; i < 27000000; i++) {
  v = rand();
  k = int(rand() * 100);
  print k " " v " " i;
 }
 exit;
}
4

18 回答 18

9

我很难相信任何对数据没有任何先验知识的脚本(与预加载此类信息的 MySql 不同)都会比 SQL 方法更快。

除了解析输入所花费的时间之外,脚本还需要“保持”按数组等对顺序进行排序......

以下是对在 SQL 中什么应该运行得相当快的第一次猜测,假设表的 aa、bb、cc 列上的索引 (*) 按该顺序排列。(一种可能的替代方法是“aa, bb DESC, cc”索引

(*) 该索引可以聚集也可以不聚集,不影响后面的查询。是否选择集群,以及是否需要“aa、bb、cc”单独索引取决于用例、表中行的大小等。

SELECT T1.aa, T1.bb, T1.cc , COUNT(*)
FROM tblAbc T1
LEFT OUTER JOIN tblAbc T2 ON T1.aa = T2.aa AND 
         (T1.bb < T2.bb OR(T1.bb = T2.bb AND T1.cc < T2.cc))
GROUP BY T1.aa, T1.bb, T1.cc
HAVING COUNT(*) < 5  -- trick, remember COUNT(*) goes 1,1,2,3,...
ORDER BY T1.aa, T1.bb, T1.cc, COUNT(*) DESC

这个想法是计算给定aa值内有多少条记录小于self。但是有一个小技巧:我们需要使用 LEFT OUTER 连接,以免我们丢弃 bb 值最大的记录或最后一个(可能恰好是前 5 个之一)。作为左连接的结果,COUNT(*) 值计数为 1、1、2、3、4 等,因此 HAVING 测试为“<5”以有效选择前 5 个。

为了模拟 OP 的示例输出,ORDER BY 在 COUNT() 上使用 DESC,可以将其删除以获得更传统的前 5 名列表。此外,如果需要,可以删除选择列表中的 COUNT(),这不会影响查询的逻辑和正确排序的能力。

另请注意,该查询在处理关系方面是确定性的,即,当给定的记录集具有相同的 bb 值(在 aa 组内)时;我认为当输入数据的顺序发生变化时,Python 程序可能会提供稍微不同的输出,这是因为它偶尔会截断排序字典。

真正的解决方案:基于 SQL 的过程方法

上面描述的自连接方法演示了如何使用声明性语句来表达 OP 的要求。然而,这种方法在某种意义上是幼稚的,因为它的性能大致与每个 aa“类别”内记录数的平方和有关。(不是 O(n^2),而是大约 O((n/a)^2),其中 a 是 aa 列的不同值的数量)换句话说,它对数据表现良好,因此平均关联的记录数给定的 aa 值不超过几十个。如果数据使得 aa 列没有选择性,则以下方法更适合。它利用了 SQL 的高效排序框架,同时实现了一种难以以声明方式表达的简单算法。通过引入对下一个 aa 值的简单二进制搜索,通过在光标中向前看(有时向后看......),可以进一步改进对于每个/大多数 aa“类别”记录数量特别多的数据集。对于 tblAbc 中相对于总行数的 aa“类别”数量较低的情况,请参见下一个方法之后的另一种方法。

DECLARE @aa AS VARCHAR(10), @bb AS INT, @cc AS VARCHAR(10)
DECLARE @curAa AS VARCHAR(10)
DECLARE @Ctr AS INT

DROP TABLE  tblResults;
CREATE TABLE tblResults
(  aa VARCHAR(10),
   bb INT,
   cc VARCHAR(10)
);

DECLARE abcCursor CURSOR 
  FOR SELECT aa, bb, cc
  FROM tblABC
  ORDER BY aa, bb DESC, cc
  FOR READ ONLY;

OPEN abcCursor;

SET @curAa = ''

FETCH NEXT FROM abcCursor INTO @aa, @bb, @cc;
WHILE @@FETCH_STATUS = 0
BEGIN
    IF @curAa <> @aa
    BEGIN
       SET @Ctr = 0
       SET @curAa = @aa
    END
    IF @Ctr < 5
    BEGIN
       SET @Ctr = @Ctr + 1;
       INSERT tblResults VALUES(@aa, @bb, @cc);
    END
    FETCH NEXT FROM AbcCursor INTO @aa, @bb, @cc;
END;

CLOSE abcCursor;
DEALLOCATE abcCursor;

SELECT * from tblResults
ORDER BY aa, bb, cc    -- OR .. bb DESC ... for a more traditional order.

对于 aa 非常不具选择性的情况,可以替代上述方法。换句话说,当我们的 aa“类别”相对较少时。这个想法是遍历不同类别的列表并为每个值运行“LIMIT”(MySql)“TOP”(MSSQL)查询。出于参考目的,在 MSSQL 8.0 上,在相对较旧/较弱的主机上,6100 万条记录分为 45 个 aa 值的 tblAbc 运行了 63 秒。

DECLARE @aa AS VARCHAR(10)
DECLARE @aaCount INT

DROP TABLE  tblResults;
CREATE TABLE tblResults
(  aa VARCHAR(10),
   bb INT,
   cc VARCHAR(10)
);

DECLARE aaCountCursor CURSOR 
  FOR SELECT aa, COUNT(*)
  FROM tblABC
  GROUP BY aa
  ORDER BY aa
  FOR READ ONLY;
OPEN aaCountCursor;


FETCH NEXT FROM aaCountCursor INTO @aa, @aaCount
WHILE @@FETCH_STATUS = 0
BEGIN
    INSERT tblResults 
       SELECT TOP 5 aa, bb, cc
       FROM tblproh
       WHERE aa = @aa
       ORDER BY aa, bb DESC, cc

    FETCH NEXT FROM aaCountCursor INTO @aa, @aaCount;
END;

CLOSE aaCountCursor
DEALLOCATE aaCountCursor

SELECT * from tblResults
ORDER BY aa, bb, cc    -- OR .. bb DESC ... for a more traditional order.

关于是否需要索引的问题. (cf OP 的评论)当仅仅运行“SELECT * FROM myTable”时,表扫描实际上是最快的方法,无需担心索引。然而,SQL 通常更适合这类事情的主要原因(除了作为数据首先积累的存储库,而任何外部解决方案都需要考虑导出相关数据的时间),是它可以依靠索引来避免扫描。许多通用语言更适合处理原始处理,但它们正在与 SQL 进行一场不公平的战斗,因为它们需要重建 SQL 在其数据收集/导入阶段收集的数据的任何先验知识。由于排序通常是一项耗时且有时会占用空间的任务,

此外,即使没有预建索引,现代查询优化器也可以决定涉及创建临时索引的计划。而且,由于排序是 DDMS 的固有部分,SQL 服务器在该领域通常是高效的。

那么... SQL 更好吗?

这就是说,如果我们试图比较纯 ETL 作业的 SQL 和其他语言,即处理(未索引表)作为其输入以执行各种转换和过滤,那么多线程实用程序很可能写在说C,并利用高效的排序库,可能会更快。决定 SQL 与非 SQL 方法的决定性问题是数据的位置以及它最终应该驻留的位置。如果我们只是将文件转换为向下提供“链”,则外部程序更适合。如果我们拥有或需要 SQL 服务器中的数据,那么只有极少数情况下值得将其导出和外部处理。

于 2009-09-23T18:50:07.520 回答
6

您可以使用更智能的数据结构并仍然使用 python。我已经在我的机器上运行了你的参考实现和我的 python 实现,甚至比较了输出以确保结果。

这是你的:

$ time python ./ref.py  < data-large.txt  > ref-large.txt

real 1m57.689s
user 1m56.104s
sys 0m0.573s

这是我的:

$ time python ./my.py  < data-large.txt  > my-large.txt

real 1m35.132s
user 1m34.649s
sys 0m0.261s
$ diff my-large.txt ref-large.txt 
$ echo $?
0

这是来源:

#!/usr/bin/python
# -*- coding: utf-8; -*-
import sys
import heapq

top_5 = {}

for line in sys.stdin:
    aa, bb, cc = line.split()

    # We want the top 5 for each distinct value of aa.  There are
    # hundreds of thousands of values of aa.
    bb = float(bb)
    if aa not in top_5: top_5[aa] = []
    current = top_5[aa]
    if len(current) < 5:
        heapq.heappush(current, (bb, cc))
    else:
        if current[0] < (bb, cc):
            heapq.heapreplace(current, (bb, cc))

for aa in top_5:
    current = top_5[aa]
    while len(current) > 0:
        bb, cc = heapq.heappop(current)
        print aa, bb, cc

更新:了解你的极限。我还计时了一个 noop 代码,以了解使用类似于原始代码的最快的 Python 解决方案:

$ time python noop.py < data-large.txt  > noop-large.txt

real    1m20.143s
user    1m19.846s
sys 0m0.267s

还有 noop.py 本身:

#!/usr/bin/python
# -*- coding: utf-8; -*-
import sys
import heapq

top_5 = {}

for line in sys.stdin:
    aa, bb, cc = line.split()

    bb = float(bb)
    if aa not in top_5: top_5[aa] = []
    current = top_5[aa]
    if len(current) < 5:
        current.append((bb, cc))

for aa in top_5:
    current = top_5[aa]
    current.sort()
    for bb, cc in current[-5:]:
        print aa, bb, cc
于 2009-09-24T06:08:23.267 回答
3

这是 Common Lisp 中的草图

请注意,对于长文件,使用 READ-LINE 会受到惩罚,因为它会为每一行提供一个新字符串。然后使用使用行缓冲区的浮动的 READ-LINE 派生类之一。您也可以检查是否希望哈希表区分大小写。

第二版

不再需要拆分字符串,因为我们在这里进行。它是低级代码,希望可以提高一些速度。它检查一个或多个空格作为字段分隔符和制表符。

(defun read-a-line (stream)
  (let ((line (read-line stream nil nil)))
    (flet ((delimiter-p (c)
             (or (char= c #\space) (char= c #\tab))))
      (when line
        (let* ((s0 (position-if     #'delimiter-p line))
               (s1 (position-if-not #'delimiter-p line :start s0))
               (s2 (position-if     #'delimiter-p line :start (1+ s1)))
               (s3 (position-if     #'delimiter-p line :from-end t)))
          (values (subseq line 0 s0)
                  (list (read-from-string line nil nil :start s1 :end s2)
                        (subseq line (1+ s3)))))))))

上面的函数返回两个值:键和其余的列表。

(defun dbscan (top-5-table stream)
   "get triples from each line and put them in the hash table"
  (loop with aa = nil and bbcc = nil do
    (multiple-value-setq (aa bbcc) (read-a-line stream))
    while aa do
    (setf (gethash aa top-5-table)
          (let ((l (merge 'list (gethash aa top-5-table) (list bbcc)
                          #'> :key #'first)))
             (or (and (nth 5 l) (subseq l 0 5)) l)))))


(defun dbprint (table output)
  "print the hashtable contents"
  (maphash (lambda (aa value)
              (loop for (bb cc) in value
                    do (format output "~a ~a ~a~%" aa bb cc)))
           table))

(defun dbsum (input &optional (output *standard-output*))
  "scan and sum from a stream"
  (let ((top-5-table (make-hash-table :test #'equal)))
    (dbscan top-5-table input)
    (dbprint top-5-table output)))


(defun fsum (infile outfile)
   "scan and sum a file"
   (with-open-file (input infile :direction :input)
     (with-open-file (output outfile
                      :direction :output :if-exists :supersede)
       (dbsum input output))))

一些测试数据

(defun create-test-data (&key (file "/tmp/test.data") (n-lines 100000))
  (with-open-file (stream file :direction :output :if-exists :supersede)
    (loop repeat n-lines
          do (format stream "~a ~a ~a~%"
                     (random 1000) (random 100.0) (random 10000)))))

; (创建测试数据)

(defun test ()
  (time (fsum "/tmp/test.data" "/tmp/result.data")))

第三版,LispWorks

使用一些 SPLIT-STRING 和 PARSE-FLOAT 函数,否则使用通用 CL。

(defun fsum (infile outfile)
  (let ((top-5-table (make-hash-table :size 50000000 :test #'equal)))
    (with-open-file (input infile :direction :input)
      (loop for line = (read-line input nil nil)
            while line do
            (destructuring-bind (aa bb cc) (split-string '(#\space #\tab) line)
              (setf bb (parse-float bb))
              (let ((v (gethash aa top-5-table)))
                (unless v
                  (setf (gethash aa top-5-table)
                        (setf v (make-array 6 :fill-pointer 0))))
                (vector-push (cons bb cc) v)
                (when (> (length v) 5)
                  (setf (fill-pointer (sort v #'> :key #'car)) 5))))))
    (with-open-file (output outfile :direction :output :if-exists :supersede)
      (maphash (lambda (aa value)
                 (loop for (bb . cc) across value do
                       (format output "~a ~f ~a~%" aa bb cc)))
               top-5-table))))    
于 2009-09-23T19:12:02.453 回答
3

这在我的机器上花了 45.7 秒,有 2700 万行数据,如下所示:

42 0.49357 0
96 0.48075 1
27 0.640761 2
8 0.389128 3
75 0.395476 4
24 0.212069 5
80 0.121367 6
81 0.271959 7
91 0.18581 8
69 0.258922 9

您的脚本在此数据上占用了 1m42,c++ 示例也占用了 1m46(g++ t.cpp -ot 来编译它,我对 c++ 一无所知)。

Java 6,这并不重要。输出并不完美,但很容易修复。

package top5;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;

public class Main {

    public static void main(String[] args) throws Exception {
        long start  = System.currentTimeMillis();
        Map<String, Pair[]> top5map = new TreeMap<String, Pair[]>();
        BufferedReader br = new BufferedReader(new FileReader("/tmp/file.dat"));

        String line = br.readLine();
        while(line != null) {
            String parts[] = line.split(" ");

            String key = parts[0];
            double score = Double.valueOf(parts[1]);
            String value = parts[2];
            Pair[] pairs = top5map.get(key);

            boolean insert = false;
            Pair p = null;
            if (pairs != null) {
                insert = (score > pairs[pairs.length - 1].score) || pairs.length < 5;
            } else {
                insert = true;
            }
            if (insert) {
                p = new Pair(score, value);
                if (pairs == null) {
                    pairs = new Pair[1];
                    pairs[0] = new Pair(score, value);
                } else {
                    if (pairs.length < 5) {
                        Pair[] newpairs = new Pair[pairs.length + 1];
                        System.arraycopy(pairs, 0, newpairs, 0, pairs.length);
                        pairs = newpairs;
                    }
                    int k = 0;
                    for(int i = pairs.length - 2; i >= 0; i--) {
                        if (pairs[i].score <= p.score) {
                            pairs[i + 1] = pairs[i];
                        } else {
                            k = i + 1;
                            break;
                        }
                    }
                    pairs[k] = p;
                }
                top5map.put(key, pairs);
            }
            line = br.readLine();
        }
        for(Map.Entry<String, Pair[]> e : top5map.entrySet()) {
            System.out.print(e.getKey());
            System.out.print(" ");
            System.out.println(Arrays.toString(e.getValue()));
        }
        System.out.println(System.currentTimeMillis() - start);
    }

    static class Pair {
        double score;
        String value;

        public Pair(double score, String value) {
            this.score = score;
            this.value = value;
        }

        public int compareTo(Object o) {
            Pair p = (Pair) o;
            return (int)Math.signum(score - p.score);
        }

        public String toString() {
            return String.valueOf(score) + ", " + value;
        }
    }
}

用于伪造数据的 AWK 脚本:

BEGIN {
 for (i = 0; i < 27000000; i++) {
  v = rand();
  k = int(rand() * 100);
  print k " " v " " i;
 }
 exit;
}
于 2009-09-23T22:19:09.237 回答
3

这是另一个 OCaml 版本 - 以速度为目标 - 在 Streams 上带有自定义解析器。太长了,但解析器的某些部分是可重用的。感谢peufeu引发竞争 :)

速度 :

  • 简单的 ocaml - 27 秒
  • 带流解析器的 ocaml - 15 秒
  • c 手动解析器 - 5 秒

编译:

ocamlopt -pp camlp4o code.ml -o caml

代码 :

open Printf

let cmp x y = compare (fst x : float) (fst y)
let digit c = Char.code c - Char.code '0'

let rec parse f = parser
  | [< a=int; _=spaces; b=float; _=spaces; 
       c=rest (Buffer.create 100); t >] -> f a b c; parse f t
  | [< >] -> ()
and int = parser
  | [< ''0'..'9' as c; t >] -> int_ (digit c) t
  | [< ''-'; ''0'..'9' as c; t >] -> - (int_ (digit c) t)
and int_ n = parser
  | [< ''0'..'9' as c; t >] -> int_ (n * 10 + digit c) t
  | [< >] -> n
and float = parser
  | [< n=int; t=frem; e=fexp >] -> (float_of_int n +. t) *. (10. ** e)
and frem = parser
  | [< ''.'; r=frem_ 0.0 10. >] -> r
  | [< >] -> 0.0
and frem_ f base = parser
  | [< ''0'..'9' as c; t >] -> 
      frem_ (float_of_int (digit c) /. base +. f) (base *. 10.) t
  | [< >] -> f
and fexp = parser
  | [< ''e'; e=int >] -> float_of_int e
  | [< >] -> 0.0
and spaces = parser
  | [< '' '; t >] -> spaces t
  | [< ''\t'; t >] -> spaces t
  | [< >] -> ()
and crlf = parser
  | [< ''\r'; t >] -> crlf t
  | [< ''\n'; t >] -> crlf t
  | [< >] -> ()
and rest b = parser
  | [< ''\r'; _=crlf >] -> Buffer.contents b
  | [< ''\n'; _=crlf >] -> Buffer.contents b
  | [< 'c; t >] -> Buffer.add_char b c; rest b t
  | [< >] -> Buffer.contents b

let () =
  let all = Array.make 200 [] in
  let each a b c =
    assert (a >= 0 && a < 200);
    match all.(a) with
    | [] -> all.(a) <- [b,c]
    | (bmin,_) as prev::tl -> if b > bmin then
      begin
        let m = List.sort cmp ((b,c)::tl) in
        all.(a) <- if List.length tl < 4 then prev::m else m
      end
  in
  parse each (Stream.of_channel stdin);
  Array.iteri 
    (fun a -> List.iter (fun (b,c) -> printf "%i %f %s\n" a b c))
    all
于 2009-10-01T09:40:44.797 回答
2

非常简单的 Caml(27 * 10^6 行 -- 27 秒,C++ by hrnt -- 29 秒)

open Printf
open ExtLib

let (>>) x f = f x
let cmp x y = compare (fst x : float) (fst y)
let wsp = Str.regexp "[ \t]+"

let () =
  let all = Hashtbl.create 1024 in
  Std.input_lines stdin >> Enum.iter (fun line ->
    let [a;b;c] = Str.split wsp line in
    let b = float_of_string b in
    try
      match Hashtbl.find all a with
      | [] -> assert false
      | (bmin,_) as prev::tl -> if b > bmin then
        begin
          let m = List.sort ~cmp ((b,c)::tl) in
          Hashtbl.replace all a (if List.length tl < 4 then prev::m else m)
        end
    with Not_found -> Hashtbl.add all a [b,c]
  );
  all >> Hashtbl.iter (fun a -> List.iter (fun (b,c) -> printf "%s %f %s\n" a b c))
于 2009-09-25T13:12:46.753 回答
2

到目前为止,在我测试过的这个线程中的所有程序中, OCaml 版本是最快的,也是最短的。(基于代码行的测量有点模糊,但并不明显比 Python 版本或 C 或 C++ 版本长,而且明显更快。)

注意:我知道为什么我之前的运行时如此不确定!我的 CPU 散热器被灰尘堵塞,​​导致我的 CPU 过热。现在我得到了很好的确定性基准时间。我想我现在已经重做了这个线程中的所有计时测量,因为我有一种可靠的计时方法。

以下是目前不同版本的时序,在 2700 万行 630 兆字节的输入数据文件上运行。我在双核 1.6GHz Celeron 上使用 Ubuntu Intrepid Ibex,运行 32 位版本的操作系统(以太网驱动程序在 64 位版本中损坏)。我对每个程序运行了五次,并报告了这五次尝试的次数范围。我正在使用 Python 2.5.2、OpenJDK 1.6.0.0、OCaml 3.10.2、GCC 4.3.2、SBCL 1.0.8.debian 和 Octave 3.0.1。

  • SquareCog 的 Pig 版本:尚未测试(因为我不能只是apt-get install pig),7行代码。
  • mjv 的纯 SQL 版本:尚未测试,但我预计运行时间为几天;7行代码。
  • ygrek 的 OCaml 版本:68.7 秒± 0.9,15行代码。
  • 我的 Python 版本:169 秒±4 或86 秒±2 与 Psyco,在16行代码中。
  • Abbot 基于堆的 Python 版本:18行代码中177 秒±5 ,或 Psyco 83 秒±5。
  • 下面是我的 C 版本,由 GNU 组成sort -n90 + 5.5 秒(±3,±0.1),但由于 GNU 的不足sort,在22行代码(包括一行 shell)中给出了错误的答案。
  • hrnt 的 C++ 版本:217 秒± 3,25行代码。
  • mjv 的另一种基于 SQL 的过程方法:尚未测试,26行代码。
  • mjv 的第一个基于 SQL 的过程方法:尚未测试,29行代码。
  • 带有 Psyco的 peufeu 的Python 版本181 秒±4,大约30行代码。
  • Rainer Joswig 的 Common Lisp 版本:42行代码,耗时478 秒(仅运行一次) 。
  • abbot's noop.py,故意给出不正确的结果来建立一个下限:尚未测试,15行代码。
  • Will Hartung 的 Java 版本:96 秒±10 英寸,根据 David A. Wheeler 的 SLOCCount,74行代码。
  • Greg 的 Matlab 版本:不起作用。
  • Schuyler Erle 关于在其中一个 Python 版本上使用 Pyrex 的建议:尚未尝试。

我怀疑方丈的版本对我来说比对他们来说更糟糕,因为真实的数据集具有高度不均匀的分布:正如我所说,一些aa值(“玩家”)有数千行,而另一些只有一个。

关于 Psyco:我将 Psyco 应用到我的原始代码(和方丈的版本)中,方法是把它放在一个main函数中,它本身将时间缩短到大约 140 秒,并在调用psyco.full()之前先调用main()。这增加了大约四行代码。

几乎可以使用 GNU 解决这个问题sort,如下:

kragen@inexorable:~/devel$ time LANG=C sort -nr infile -o sorted

real    1m27.476s
user    0m59.472s
sys 0m8.549s
kragen@inexorable:~/devel$ time ./top5_sorted_c < sorted > outfile

real    0m5.515s
user    0m4.868s
sys 0m0.452s

top5_sorted_c是这个简短的 C 程序:

#include <ctype.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

enum { linesize = 1024 };

char buf[linesize];
char key[linesize];             /* last key seen */

int main() {
  int n = 0;
  char *p;

  while (fgets(buf, linesize, stdin)) {
    for (p = buf; *p && !isspace(*p); p++) /* find end of key on this line */
      ;
    if (p - buf != strlen(key) || 0 != memcmp(buf, key, p - buf)) 
      n = 0;                    /* this is a new key */
    n++;

    if (n <= 5)               /* copy up to five lines for each key */
      if (fputs(buf, stdout) == EOF) abort();

    if (n == 1) {               /* save new key in `key` */
      memcpy(key, buf, p - buf);
      key[p-buf] = '\0';
    }
  }
  return 0;
}

我首先尝试用 C++ 编写该程序,如下所示,我得到的运行时间要慢得多,为 33.6±2.3 秒而不是 5.5±0.1 秒:

#include <map>
#include <iostream>
#include <string>

int main() {
  using namespace std;
  int n = 0;
  string prev, aa, bb, cc;

  while (cin >> aa >> bb >> cc) {
    if (aa != prev) n = 0;
    ++n;
    if (n <= 5) cout << aa << " " << bb << " " << cc << endl;
    prev = aa;
  }
  return 0;
}

我确实说差不多。问题是sort -n大多数数据都可以,但是在尝试0.333.78168e-05. 所以为了获得这种性能并真正解决问题,我需要一个更好的排序。

无论如何,我觉得我在抱怨,但是排序和过滤方法比 Python 程序快大约 5 倍,而来自 hrnt 的优雅 STL 程序实际上要慢一些——似乎有某种中的严重低效率<iostream>。我不知道其他 83% 的运行时在那个小 C++ 版本的过滤器中去哪里了,但它没有任何用处,这让我怀疑我也不知道它在 hrntstd::map版本中的去向。那个版本也可以加速 5 倍吗?因为那会很酷。它的工作集可能比我的二级缓存大,但碰巧它可能不是。

对 callgrind 的一些调查表明,我在 C++ 中的过滤器程序在operator >>. 我可以识别每个输入字节至少 10 个函数调用,cin.sync_with_stdio(false);但没有帮助。这可能意味着我可以通过更有效地解析输入行来让 hrnt 的 C 程序运行得更快。

编辑:kcachegrind 声称 hrnt 的程序执行 62% 的指令(在一个小的 157000 行输入文件上)doubleistream. 其中很大一部分是因为 istreams 库在尝试解析double. 疯狂的。我会误解 kcachegrind 的输出吗?

无论如何,还有其他建议吗?

于 2009-09-24T02:39:38.133 回答
1

有没有人试过用 awk 来解决这个问题。特别是'mawk'?根据这篇博客文章,它甚至应该比 Java 和 C++ 更快:http: //anyall.org/blog/2009/09/dont-mawk-awk-the-fastest-and-most-elegant-big-data-芒格语言/

编辑:只是想澄清一下,该博客文章中的唯一声明是,对于特别适合 awk 样式处理的特定类别的问题,mawk 虚拟机可以击败 Java 和 C++ 中的“香草”实现。

于 2009-09-24T04:34:01.560 回答
1

这是一个 C++ 解决方案。但是,我没有很多数据来测试它,所以我不知道它实际上有多快。

[编辑] 感谢这个线程中的 awk 脚本提供的测试数据,我设法清理并加速了代码。我并不是要找出最快的版本——目的是提供一个相当快的版本,它不像人们认为的 STL 解决方案那样丑陋。

这个版本应该比第一个版本快两倍(在大约 35 秒内通过 2700 万行)。Gcc 用户,记得用 -O2 编译它。

#include <map>
#include <iostream>
#include <functional>
#include <utility>
#include <string>
int main() {
  using namespace std;
  typedef std::map<string, std::multimap<double, string> > Map;
  Map m;
  string aa, cc;
  double bb;
  std::cin.sync_with_stdio(false); // Dunno if this has any effect, but anyways.

  while (std::cin >> aa >> bb >> cc)
    {
      if (m[aa].size() == 5)
        {
          Map::mapped_type::iterator iter = m[aa].begin();
          if (bb < iter->first)
            continue;
          m[aa].erase(iter);
        }
      m[aa].insert(make_pair(bb, cc));
    }
  for (Map::const_iterator iter = m.begin(); iter != m.end(); ++iter)
    for (Map::mapped_type::const_iterator iter2 = iter->second.begin();
         iter2 != iter->second.end();
         ++iter2)
      std::cout << iter->first << " " << iter2->first << " " << iter2->second <<
 std::endl;

}
于 2009-09-23T21:32:51.003 回答
1

有趣的是,最初的 Python 解决方案是迄今为止看起来最干净(尽管 C++ 示例很接近)。

在您的原始代码上使用 Pyrex 或 Psyco 怎么样?

于 2009-09-24T00:37:18.707 回答
1

既然您询问了 Matlab,这就是我所做的事情,就像您所要求的那样。我试图在没有任何 for 循环的情况下做到这一点,但我确实有一个,因为我不想花很长时间来处理它。如果您担心内存问题,那么您可以使用 fscanf 以块的形式从流中提取数据,而不是读取整个缓冲区。

fid = fopen('fakedata.txt','r');
tic
A=fscanf(fid,'%d %d %d\n');
A=reshape(A,3,length(A)/3)';  %Matlab reads the data into one long column'
Names = unique(A(:,1));
for i=1:length(Names)
    indices = find(A(:,1)==Names(i));   %Grab all instances of key i
    [Y,I] = sort(A(indices,2),1,'descend'); %sort in descending order of 2nd record
    A(indices(I(1:min([5,length(indices(I))]))),:) %Print the top five
end
toc
fclose(fid)
于 2009-09-24T23:14:46.963 回答
1

说到计算时间的下限:

让我们分析一下我上面的算法:

for each row (key,score,id) :
    create or fetch a list of top scores for the row's key
    if len( this list ) < N
        append current
    else if current score > minimum score in list
        replace minimum of list with current row
        update minimum of all lists if needed

令 N 为 top-N 中的 N 令 R 为数据集中的行数 令 K 为不同键的数量

我们可以做出哪些假设?

R * sizeof( row ) > RAM 或者至少它足够大,我们不想全部加载,使用散列按键分组,并对每个 bin 进行排序。出于同样的原因,我们不会对所有内容进行排序。

Kragen 喜欢哈希表,所以 K * sizeof(per-key state) << RAM,很可能它适合 L2/3 缓存

Kragen 没有排序,所以 K*N << R 即每个键的条目多于 N

(注意:A << B 表示 A 相对于 B 小)

如果数据是随机分布的,那么

在少量行之后,大多数行将被每个键的最小条件拒绝,成本是每行 1 次比较。

所以每行的成本是 1 次哈希查找 + 1 次比较 + epsilon *(列表插入 + (N+1) 次比较为最小值)

如果分数具有随机分布(例如在 0 和 1 之间)并且上述条件成立,则两个 epsilon 都将非常小。

实验证明:

上面的 2700 万行数据集在 top-N 列表中产生了 5933 次插入。所有其他行都被一个简单的键查找和比较拒绝。ε = 0.0001

粗略地说,成本是每行 1 次查找 + 比较,这需要几纳秒。

在当前的硬件上,与 IO 成本,尤其是解析成本相比,这不可能可以忽略不计。

于 2009-09-30T12:58:14.117 回答
0

这不就是这么简单吗

 SELECT DISTINCT aa, bb, cc FROM tablename ORDER BY bb DESC LIMIT 5

?

当然,如果不根据数据进行测试,很难判断什么是最快的。如果这是您需要快速运行的东西,那么优化您的数据库以使查询更快而不是优化查询可能是有意义的。

而且,当然,如果您仍然需要平面文件,您不妨使用它。

于 2009-09-23T19:09:24.650 回答
0

我喜欢午休挑战。这是一个 1 小时的实现。

好的,当您不想做一些非常奇特的废话,比如加法时,没有什么能阻止您使用自定义的 base-10 浮点格式,其唯一实现的运算符是比较,对吧?哈哈。

我有一些来自以前项目的 fast-atoi 代码,所以我只是导入了它。

http://www.copypastecode.com/11541/

这段 C 源代码解析 580MB 的输入文本(2700 万行)大约需要 6.6 秒,其中一半时间是 fgets,哈哈。然后计算 top-n 大约需要 0.05 秒,但我不确定,因为 top-n 所需的时间小于计时器噪声。

您将成为测试它的正确性的人,尽管 XDDDDDDDDDDD

有趣吧?

于 2009-09-28T12:58:42.437 回答
0

选择“前 5 名”看起来像这样。请注意,没有排序。top_5 字典中的任何列表也不会超过 5 个元素。

from collections import defaultdict
import sys

def keep_5( aList, aPair ):
    minbb= min( bb for bb,cc in aList )
    bb, cc = aPair
    if bb < minbb: return aList
    aList.append( aPair )
    min_i= 0
    for i in xrange(1,6):
        if aList[i][0] < aList[min_i][0]
            min_i= i
    aList.pop(min_i)
    return aList


top_5= defaultdict(list)
for row in sys.stdin:
    aa, bb, cc = row.split()
    bb = float(bb)
    if len(top_5[aa]) < 5:
        top_5[aa].append( (bb,cc) )
    else:
        top_5[aa]= keep_5( top_5[aa], (bb,cc) )
于 2009-09-23T19:23:02.703 回答
0

Pig 版本会是这样的(未经测试):

 Data = LOAD '/my/data' using PigStorage() as (aa:int, bb:float, cc:chararray);
 grp = GROUP Data by aa;
 topK = FOREACH grp (
     sorted = ORDER Data by bb DESC;
     lim = LIMIT sorted 5;
     GENERATE group as aa, lim;
)
STORE topK INTO '/my/output' using PigStorage();

Pig 没有针对性能进行优化;它的目标是使用并行执行框架来处理多 TB 数据集。它确实具有本地模式,因此您可以尝试一下,但我怀疑它会击败您的脚本。

于 2009-09-24T05:50:46.233 回答
0

那是一个很好的午休挑战,他,他。

Top-N 是众所周知的数据库杀手。如上面的帖子所示,没有办法用普通的 SQL 有效地表达它。

至于各种实现,您必须记住,其中缓慢的部分不是排序或 top-N,而是文本的解析。你最近看过 glibc 的 strtod() 的源代码吗?

例如,我得到,使用 Python :

Read data : 80.5  s
My TopN   : 34.41 s
HeapTopN  : 30.34 s

无论您使用哪种语言,您很可能永远不会获得非常快的计时,除非您的数据采用某种比文本解析速度快得多的格式。例如,将测试数据加载到 postgres 需要 70 秒,其中大部分也是文本解析。

如果你的 topN 中的 N 很小,比如 5,那么下面我的算法的 C 实现可能是最快的。如果 N 可以更大,堆是一个更好的选择。

因此,由于您的数据可能在数据库中,而您的问题是获取数据,而不是实际的处理,如果您真的需要一个超快的 TopN 引擎,您应该为您编写一个 C 模块选择的数据库。由于 postgres 对任何事情都更快,我建议使用 postgres,而且为它编写 C 模块并不难。

这是我的 Python 代码:

import random, sys, time, heapq

ROWS = 27000000

def make_data( fname ):
    f = open( fname, "w" )
    r = random.Random()
    for i in xrange( 0, ROWS, 10000 ):
        for j in xrange( i,i+10000 ):
            f.write( "%d    %f    %d\n" % (r.randint(0,100), r.uniform(0,1000), j))
        print ("write: %d\r" % i),
        sys.stdout.flush()
    print

def read_data( fname ):
    for n, line in enumerate( open( fname ) ):
        r = line.strip().split()
        yield int(r[0]),float(r[1]),r[2]
        if not (n % 10000 ):
            print ("read: %d\r" % n),
            sys.stdout.flush()
    print

def topn( ntop, data ):
    ntop -= 1
    assert ntop > 0
    min_by_key = {}
    top_by_key = {}
    for key,value,label in data:
        tup = (value,label)
        if key not in top_by_key:
            # initialize
            top_by_key[key] = [ tup ]
        else:
            top = top_by_key[ key ]
            l    = len( top )
            if l > ntop:
                # replace minimum value in top if it is lower than current value
                idx = min_by_key[ key ]
                if top[idx] < tup:
                    top[idx] = tup
                    min_by_key[ key ] = top.index( min( top ) )
            elif l < ntop:
                # fill until we have ntop entries
                top.append( tup )
            else:
                # we have ntop entries in list, we'll have ntop+1
                top.append( tup )
                # initialize minimum to keep
                min_by_key[ key ] = top.index( min( top ) )

    # finalize:
    return dict( (key, sorted( values, reverse=True )) for key,values in top_by_key.iteritems() )

def grouptopn( ntop, data ):
    top_by_key = {}
    for key,value,label in data:
        if key in top_by_key:
            top_by_key[ key ].append( (value,label) )
        else:
            top_by_key[ key ] = [ (value,label) ]

    return dict( (key, sorted( values, reverse=True )[:ntop]) for key,values in top_by_key.iteritems() )

def heaptopn( ntop, data ):
    top_by_key = {}
    for key,value,label in data:
        tup = (value,label)
        if key not in top_by_key:
            top_by_key[ key ] = [ tup ]
        else:
            top = top_by_key[ key ]
            if len(top) < ntop:
                heapq.heappush(top, tup)
            else:
                if top[0] < tup:
                    heapq.heapreplace(top, tup)

    return dict( (key, sorted( values, reverse=True )) for key,values in top_by_key.iteritems() )

def dummy( data ):
    for row in data:
        pass

make_data( "data.txt" )

t = time.clock()
dummy( read_data( "data.txt" ) )
t_read = time.clock() - t

t = time.clock()
top_result = topn( 5, read_data( "data.txt" ) )
t_topn = time.clock() - t

t = time.clock()
htop_result = heaptopn( 5, read_data( "data.txt" ) )
t_htopn = time.clock() - t

# correctness checking :
for key in top_result:
    print key, " : ", "        ".join (("%f:%s"%(value,label)) for (value,label) in    top_result[key])
    print key, " : ", "        ".join (("%f:%s"%(value,label)) for (value,label) in htop_result[key])

print
print "Read data :", t_read
print "TopN :     ", t_topn - t_read
print "HeapTopN : ", t_htopn - t_read

for key in top_result:
    assert top_result[key] == htop_result[key]
于 2009-09-26T13:19:37.470 回答
0

好吧,请喝杯咖啡并阅读 strtod 的源代码——这令人难以置信,但需要,如果你想浮动 -> 文本 -> 浮动以返回与你开始时相同的浮动......真的......

解析整数要快得多(虽然在 python 中没有那么多,但在 C 中,是的)。

无论如何,将数据放入 Postgres 表中:

SELECT count( key ) FROM the dataset in the above program

=> 7 秒(因此读取 27M 条记录需要 7 秒)

CREATE INDEX topn_key_value ON topn( key, value );

191 秒

CREATE TEMPORARY TABLE topkeys AS SELECT key FROM topn GROUP BY key;

12 秒

(您也可以使用索引更快地获取“键”的不同值,但它需要一些轻量级的 plpgsql 黑客攻击)

CREATE TEMPORARY TABLE top AS SELECT (r).* FROM (SELECT (SELECT b AS r FROM topn b WHERE b.key=a.key ORDER BY value DESC LIMIT 1) AS r FROM topkeys a) foo;

时间:15,310 毫秒

INSERT INTO top SELECT (r).* FROM (SELECT (SELECT b AS r FROM topn b WHERE b.key=a.key ORDER BY value DESC LIMIT 1 OFFSET 1) AS r FROM topkeys a) foo;

时间:17,853 毫秒

INSERT INTO top SELECT (r).* FROM (SELECT (SELECT b AS r FROM topn b WHERE b.key=a.key ORDER BY value DESC LIMIT 1 OFFSET 2) AS r FROM topkeys a) foo;

时间:13,983 毫秒

INSERT INTO top SELECT (r).* FROM (SELECT (SELECT b AS r FROM topn b WHERE b.key=a.key ORDER BY value DESC LIMIT 1 OFFSET 3) AS r FROM topkeys a) foo;

温度:16,860 毫秒

INSERT INTO top SELECT (r).* FROM (SELECT (SELECT b AS r FROM topn b WHERE b.key=a.key ORDER BY value DESC LIMIT 1 OFFSET 4) AS r FROM topkeys a) foo;

时间:17,651 毫秒

INSERT INTO top SELECT (r).* FROM (SELECT (SELECT b AS r FROM topn b WHERE b.key=a.key ORDER BY value DESC LIMIT 1 OFFSET 5) AS r FROM topkeys a) foo;

时间:19,216 毫秒

SELECT * FROM top ORDER BY key,value;

如您所见,计算 top-n 非常快(假设 n 很小),但创建(强制)索引非常慢,因为它涉及完整排序。

您最好的选择是使用一种快速解析的格式(二进制,或为您的数据库编写自定义 C 聚合,恕我直言,这将是最佳选择)。如果 python 可以在 1 秒内完成,C 程序中的运行时间不应超过 1 秒。

于 2009-09-27T23:21:10.247 回答