5

我正在尝试创建 Lwt 的并发示例并想出了这个小示例

let () =
  Lwt_main.run (
      let start = Unix.time () in
      Lwt_io.open_file Lwt_io.Input "/dev/urandom" >>= fun data_source ->
      Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
      Lwt_list.iter_p
        (fun count ->
         let count = string_of_int count in
         Lwt_io.open_file
           ~flags:[Unix.O_RDWR; Unix.O_CREAT]
           ~perm:0o777
           ~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= fun h ->
         Lwt_io.read ~count:52428800
                     data_source >>= Lwt_io.write_line h)
        [0;1;2;3;4;5;6;7;8;9] >>= fun () ->
      let finished = Unix.time () in
      Lwt_io.printlf "Execution time took %f seconds" (finished -. start))

编辑:要求 50GB 是:“但是这非常慢而且基本上没用。内部绑定是否需要以某种方式强制?”

编辑:我最初写过要求 50 GB,但它从未完成,现在我有一个不同的问题,要求 50 MB,执行几乎是瞬时的,并且 du -sh 只报告 80k 的目录大小。

编辑:我还尝试了显式关闭文件句柄的代码,结果相同。

我在OS X最新版本并编译

ocamlfind ocamlopt -package lwt.unix main.ml -linkpkg -o Test

(我也试过/dev/random,是的,我正在使用挂钟时间。)

4

2 回答 2

18

所以,你的代码有一些问题。

第一期

主要问题是您Lwt_io.read错误地理解了该功能(没有人可以责怪您!)。

val read : ?count : int -> input_channel -> string Lwt.t
  (** [read ?count ic] reads at most [len] characters from [ic]. It
      returns [""] if the end of input is reached. If [count] is not
      specified, it reads all bytes until the end of input. *)

指定时~count:len,它将读取最多 len字符。最多,意味着它可以读得更少。但是如果count省略该选项,那么它将读取所有数据。我个人觉得这种行为不直观,如果不是奇怪的话。因此,这最多意味着最多len或更少,即不保证它会准确读取len字节。实际上,如果您在程序中添加检查:

 Lwt_io.read ~count:52428800 data_source >>= fun data ->
 Lwt_io.printlf "Read %d bytes" (String.length data) >>= fun () ->
 Lwt_io.write h data >>= fun () ->

你会看到,每次尝试它只会读取4096字节:

Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes

为什么4096?因为这是默认的缓冲区大小。但其实没关系。

第 2 期

Lwt_io module implements a buffered IO. That means that all your writes and reads are not going directly to a file, but are buffered in the memory. That means, that you should remember to flush and close. Your code doesn't close descriptors on finish, so you can end up with a situation when some buffers are left unflushed after a program is terminated. Lwt_io in particular, flushes all buffers before program exit. But you shouldn't rely on this undocumented feature (it may hit you in future, when you will try any other buffered io, like fstreams from standard C library). So, always close your files (another problem is that today file descriptors are the most precious resource, and their leaking is very hard to find).

Issue 3

Don't use /dev/urandom or /dev/random to measure io. For the former you will measure the performance of random number generator, for the latter you will measure the flow of entropy in your machine. Both are quite slow. Depending on the speed of your CPU, you will rarely get more than 16 Mb/s, and it is much less, then Lwt can throughput. Reading from /dev/zero and writing to /dev/null will actually perform all transfers in memory and will show the actual speed, that can be achieved by your program. A well-written program will be still bounded by the kernel speed. In the example program, provided below, this will show an average speed of 700 MB/s.

Issue 4

Don't use the buffered io, if you're really striving for a performance. You will never get the maximum. For example, Lwt_io.read will read first at buffer, then it will create a string and copy data to that string. If you really need some performance, then you should provide your own buffering. In most cases, there is no need for this, as Lwt_io is quite performant. But if you need to process dozens of megabytes per second, or need some special buffering policy (something non-linear), you may need to think about providing your own buffering. The good news is that Lwt_io allows you to do this. You can take a look at an example program, that will measure the performance of Lwt input/output. It mimics a well-known pv program.

Issue 5

You're expecting to get some performance by running threads in parallel. The problem is that in your test there is no place for the concurrency. /dev/random (as well as /dev/zero) is one device that is bounded only by CPU. This is the same, as just calling a random function. It will always be available, so no system call will block on it. Writing to a regular file is also not a good place for concurrency. First of all, usually there is only one hard-drive, with one writing head in it. Even if system call will block and yield control to another thread, this will result in a performance digression, as two threads will now compete for the header position. If you have SSD, there will not be any competition for the header, but the performance will be still worse, as you will spoil your caches. But fortunately, usually writing on regular files doesn't block. So your threads will run consequently, i.e., they will be serialized.

于 2015-07-01T13:56:58.707 回答
3

如果您查看您的文件,您会发现它们都是 4097K——即从 /dev/urandom 读取的 4096K,加上换行符的一个字节。您正在使用 Lwt_io.read 达到缓冲区最大值,因此即使您说 ~count:awholelot,它也只会给您 ~count:4096。

我不知道执行此操作的规范 Lwt 方法是什么,但这是一种替代方法:

open Lwt

let stream_a_little source n = 
    let left = ref n in
    Lwt_stream.from (fun () -> 
        if !left <= 0 then return None
        else Lwt_io.read ~count:!left source >>= (fun s -> 
            left:=!left - (Bytes.length s);
            return (Some s)
        ))

let main () =
    Lwt_io.open_file ~buffer_size:(4096*8) ~mode:Lwt_io.Input "/dev/urandom" >>= fun data_source ->
        Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
            Lwt_list.iter_p
        (fun count ->
            let count = string_of_int count in
            Lwt_io.open_file
           ~flags:[Unix.O_RDWR; Unix.O_CREAT]
           ~perm:0o777
           ~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= (fun h ->
               Lwt_stream.iter_s (Lwt_io.write h)
               (stream_a_little data_source 52428800)))
        [0;1;2;3;4;5;6;7;8;9]

let timeit f =
        let start = Unix.time () in
        f () >>= fun () ->
            let finished = Unix.time () in
            Lwt_io.printlf "Execution time took %f seconds" (finished -. start)

let () =
    Lwt_main.run (timeit main)

编辑:请注意,lwt 是合作社线程库;当您有两个线程“同时”运行时,它们实际上并不会同时在您的 OCaml 进程中执行操作。OCaml(到目前为止)是单核的,所以当一个线程移动时,其他线程会很好地等待,直到该线程说“好的,我已经完成了一些工作,你们其他人去”。因此,当您尝试同时流式传输到 8 个文件时,您基本上是在向 file1 分配一点随机性,然后向 file2 分配一点,……向 file8 分配一点,然后(如果还有工作要做)一点到file1,然后再到file2等。如果您无论如何都在等待大量输入(例如您的输入来自网络),并且您的主进程有很多时间通过每个线程并检查“有任何输入吗?”,但是当你所有的线程都只是从 /dev/random 读取时,它'

EDIT2:展示了如何增加阅读器的缓冲区大小,提高了一点速度;)请注意,您也可以在旧示例中​​简单地将 buffer_size 设置为您想要的高,这将一次性读取所有内容,但您可以除非您多次阅读,否则不会超过您的 buffer_size。

于 2015-07-01T08:14:32.090 回答