3

我目前正在学习Lwt。我有兴趣使用异步进程来用 OCaml 例程替换一些 shell 例程。

让我们看一下简化的第一次尝试,其中通过组合两个正在运行的线程来创建过滤器cat

let filter_cat ()=
  Lwt_process.pmap_lines ("cat", [| "cat" |])

let filter_t () =
  Lwt_io.stdin
  |> Lwt_io.read_lines
  |> filter_cat ()
  |> filter_cat ()
  |> Lwt_io.write_lines Lwt_io.stdout

let () =
  filter_t ()
  |> Lwt_main.run

此过滤器以某种方式工作,但在其标准输入关闭而不是退出时挂起。如果我删除其中一个filter_cat,它会按预期工作。

我猜我没有适当地组合这些过滤器,因此无法加入我开始的两个线程。组成这些过滤器的正确方法是什么,以便程序在读取后EOF终止stdin


您可以在Github gist中找到该程序和BSD Owl Makefile 。

4

1 回答 1

1

对此的答案是,Lwt 中存在一个小错误。有一个内部功能,监控执行管道的功能:

(* Monitor the thread [sender] in the stream [st] so write errors are
   reported. *)
let monitor sender st =
  let sender = sender >|= fun () -> None in
  let state = ref Init in
  Lwt_stream.from
    (fun () ->
       match !state with
         | Init ->
             let getter = Lwt.apply Lwt_stream.get st in
             let result _ =
               match Lwt.state sender with
                 | Lwt.Sleep ->
                     (* The sender is still sleeping, behave as the
                        getter. *)
                     getter
                 | Lwt.Return _ ->
                     (* The sender terminated successfully, we are
                        done monitoring it. *)
                     state := Done;
                     getter
                 | Lwt.Fail _ ->
                     (* The sender failed, behave as the sender for
                        this element and save current getter. *)
                     state := Save getter;
                     sender
             in
             Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
         | Save t ->
             state := Done;
             t
         | Done ->
             Lwt_stream.get st)

问题出在定义中

let getter = Lwt.apply Lwt_stream.get st

getter进程遇到流的末尾时,它被保存,但sender丢失了,这似乎阻止了完成。这可以通过改进定义来解决,方法getter是告诉它sender在到达流的末尾时表现。

于 2016-01-25T08:11:15.347 回答