1

简而言之:在 Clojure 中,有没有办法在我编写的自定义序列类型上从标准序列 API(未在 ISeq、IndexedSeq 等任何接口上定义)重新定义函数?


1. 庞大的数据文件

我有以下格式的大文件:

  • n包含条目数的 long(8 个字节)
  • n条目,每个条目由 3 个 long 组成(即 24 个字节)

2.自定义序列

我想对这些条目进行排序。由于我通常不能一次将所有数据保存在内存中,并且我希望对其进行快速顺序访问,因此我编写了一个类似于以下内容的类:

(deftype DataSeq [id
                  ^long cnt
                  ^long i
                  cached-seq]
  clojure.lang.IndexedSeq

  (index [_]     i)
  (count [_]     (- cnt i))
  (seq   [this]  this)
  (first [_]     (first cached-seq))
  (more  [this]  (if-let [s (next this)] s '()))

  (next [_] (if (not= (inc i) cnt)
              (if (next cached-seq)
                (DataSeq. id cnt (inc i) (next cached-seq))
                (DataSeq. id cnt (inc i)
                          (with-open [f (open-data-file id)]
                             ; open a memory mapped byte array on the file
                             ; seek to the exact position to begin reading
                             ; decide on an optimal amount of data to read
                             ; eagerly read and return that amount of data
                          ))))))

主要思想是提前读取列表中的一堆条目,然后从该列表中消费。每当缓存被完全消耗时,如果还有剩余条目,则从新缓存列表中的文件中读取它们。就那么简单。

为了创建这样一个序列的实例,我使用了一个非常简单的函数,例如:

(defn ^DataSeq load-data [id]
  (next (DataSeq. id (count-entries id) -1 [])))
; count-entries is a trivial "open file and read a long" memoized

如您所见,数据的格式使我能够count非常简单有效地实现。

3.drop可能是 O(1)

本着同样的精神,我想重新实现drop. 这些数据文件的格式允许我drop在 O(1) 中重新实现(而不是标准的 O(n)),如下所示:

  • 如果丢弃的缓存项少于剩余的缓存项,则从缓存中丢弃相同的数量并完成;

  • 如果丢弃超过cnt,则只返回空列表。

  • 否则,只需找出数据文件中的位置,直接跳到该位置,然后从那里读取数据。

我的困难在于它drop的实现方式与count, first,seq等不同。后面的函数调用一个类似命名的静态方法,RT该方法又调用我上面的实现,而前者drop不检查它被调用的序列提供了一个自定义实现。

显然,我可以提供一个名为 any 的函数,但drop它完全符合我的要求,但这会迫使其他人(包括我未来的自己)记住使用它而不是drop每次都使用它,这很糟糕。

所以,问题是:是否可以覆盖的默认行为drop

4. 一种解决方法(我不喜欢)

在写这个问题时,我刚刚想出了一个可能的解决方法:让阅读更加懒惰。自定义序列只会保留一个索引并推迟读取操作,这只会在first被调用时发生。问题是我需要一些可变状态:第一次调用first会导致一些数据被读入缓存,所有后续调用都会从这个缓存返回数据。会有类似的逻辑next:如果有缓存,就next它;否则,不要费心填充它——它会在first再次调用时完成。

这将避免不必要的磁盘读取。然而,这仍然不是最优的——它仍然是 O(n),它很容易是 O(1)。

无论如何,我不喜欢这种解决方法,我的问题仍然悬而未决。有什么想法吗?

谢谢。

4

1 回答 1

1

目前,我实施了上述解决方法。它通过将读取延迟到第一次调用来工作(first),这会将数据存储在本地可变缓存中。

请注意,此版本使用unsynchronized-mutable(以避免每次调用 volatile-readsfirst和第一次调用 volatile-write )。换句话说:不要在线程之间共享。为了使其线程安全,请改用(这会导致小的性能损失)。它仍然可能导致不同线程多次读取相同的数据。为避免这种情况,请改回并确保在读取或写入字段时使用。nextmorefirstvolatile-mutableunsynchronized-mutable(locking this ...)cache

编辑:经过一些(非严格)测试后,似乎引入的开销(locking this ...)类似于不必要的磁盘读取引入的开销(请注意,我正在从快速 SSD 读取,可能已经缓存了部分数据) . 因此,目前(以及对于我的特定硬件)最好的线程安全解决方案是使用易失性缓存。

(deftype DataSeq [id
                  ^long cnt
                  ^long i
                  ^{:unsynchronized-mutable true} cache]
  clojure.lang.IndexedSeq

  (index [_]    i)
  (count [_]    (- cnt i))
  (seq   [this] this)
  (more  [this] (if-let [s (.next this)] s '()))
  (next  [_]    (if (not= (inc i) cnt)
                  (DataSeq. id cnt (inc i) (next cache))))
  (first [_]
    (when-not (seq cache)
      (set! cache
            (with-open [f (open-data-file id)]
              ; open a memory mapped byte array on the file
              ; seek to the exact position to begin reading
              ; decide on an optimal amount of data to read
              ; eagerly read and return that amount of data
            )))
    (first cache)))

仍然困扰我的是,我必须使用可变状态来停止drop(即,“出去,你这无用的数据”)从磁盘读取......

于 2012-10-13T09:27:39.633 回答