6

Situation

I have a 13 million-lines CSV on which I want to perform logistic regression (incanter) for each group. My file is like that (values are just sample)

ID Max Probability
1  1   0.5 
1  5   0.6
1  10  0.99
2  1   0.1
2  7   0.95

So I first read it with a csv-reader, everithing is fine.

I have then something like that :

( {"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc.

I want to group-by these values by Id, If I remember correctly, there are around 1.2 millions of Ids. (I did it in Python with pandas and it is super fast)

This is my function to read and format the file (it works fine on smaller datasets) :

  (defn read-file
  []
    (let [path (:path-file @config)
          content-csv (take-csv path \,)]
      (->> (group-by :Id content-csv)
           (map (fn [[k v]]
                [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
           (into {}))))

I want finally to have something like that to perform the logistic regression (I'm flexible about that, does not need vectors for :x and :y , seqs are ok)

{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc.

Problem

I have trouble with the group-by operation. I tried it separately on the output from CSV and this is taking forever when it does not dies out because of the Java Heap Space memory. I thought that the problem was my mapv thing but this is the group-by.

I thought about using reduce or reduce-kv but I do not know how to use these functions for this kind of purposes.

I do not care about the order of ":x" and ":y" (as soon as they are the same between them, I mean that x and y has the same index ... not a problem because they are on the same line) and of Ids on the final result and I read that group-by keep the order. Maybe that is that which is costly for the operation ?

I give you sample data if any person has encountered that :

(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95}))

Other alternatives

I have other ideas but i'm not sure they are "Clojure"-friendly.

  • In Python, because of the nature of the function and because the file is already ordered, instead of using group-by, I wrote in a dataframe beginning and end indexes for each group so that I just had to select directly the sub-datatab.

  • I can also load a list of the ids instead of computing it from Clojure. Like

    (def ids '("1" "2" etc.

So maybe it is possible to begin with :

{"1" {:x [] :y []} "2" {:x [] :y []} etc.

from the previous seq and then match the big file on each ID.

I don't know if it's more efficient in fact.

I have all the other functions for logistic regression, I just lack this part ! Thanks !

EDIT

Thanks for the answers, I finally have this solution.

In my project.clj file

 :jvm-opts ["-Xmx13g"])

Code :

(defn data-group->map [group]
  {(:Id (first group))
   {:x (map :Max group)
    :y (map :Probability group)}})


(defn prob-cumsum [data]
  (cag/fmap
    (fn [x]
      (assoc x :y (reductions + (x :y))))
  data))


(defn process-data-splitter [data]
  (->> (partition-by :Id data)
       (map data-group->map)
       (into {})
       (prob-cumsum)))

I wrapped all my code and it works. The split takes about 5 minutes but I do not need mega-speed. Memory usage can go up to all memory for file reading then less for sigmoid.

4

1 回答 1

6

如果您的文件按 id 排序,则可以partition-by使用group-by.

那么您的代码将如下所示:

(defn data-group->map [group]
  [(:Id (first group))
   {:x (mapv :Max group)
    :y (mapv :Probability group)}])

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (->> content-csv
         (partition-by :Id)
         (map data-group->map)
         (into {}))))

那应该加快速度。然后你可以使用传感器让它更快

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (into {} (comp (partition-by :Id)
                   (map data-group->map))
          content-csv)))

让我们做一些测试:

首先生成一个像你这样的巨大数据:

(def huge-data
  (doall (mapcat #(repeat 
                     1000000
                     {:Id % :Max 1 :Probability 10})
           (range 10))))

我们有一千万个项目数据集,有百万个{:Id 0 :Max 1 :Probability 10},百万个{:Id 1 :Max 1 :Probability 10}等等。

现在要测试的功能:

(defn process-data-group-by [data]
  (->> (group-by :Id data)
       (map (fn [[k v]]
              [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
       (into {})))

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)
       (into {})))

(defn process-data-transducer [data]
  (into {} (comp (partition-by :Id) (map data-group->map)) data))

现在时间测试:

(do (time (dorun (process-data-group-by huge-data)))
    (time (dorun (process-data-partition-by huge-data)))
    (time (dorun (process-data-transducer huge-data))))

"Elapsed time: 3377.167645 msecs"
"Elapsed time: 3707.03448 msecs"
"Elapsed time: 1462.955152 msecs"

注意,这partition-by会产生惰性序列,而 group-by 应该实现整个集合。因此,如果您需要一组一组的数据,而不是整个地图,您可以(into {})更快地删除和访问每个数据:

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)))

查看:

user> (time (def processed-data (process-data-partition-by huge-data)))
"Elapsed time: 0.06079 msecs"
#'user/processed-data
user> (time (let [f (first processed-data)]))
"Elapsed time: 302.200571 msecs"
nil
user> (time (let [f (second processed-data)]))
"Elapsed time: 500.597153 msecs"
nil
user> (time (let [f (last processed-data)]))
"Elapsed time: 2924.588625 msecs"
nil
user.core> (time (let [f (last processed-data)]))
"Elapsed time: 0.037646 msecs"
nil
于 2016-02-01T12:40:15.650 回答