9

I'm trying to use the OBus library with Lwt_react. This uses "functional reactive programming" for properties and signals.

The problem (as noted in the React documentation) is that OCaml may garbage collect your callback while you're still using it. There's a keep function, which keeps the handler forever, but I don't want that. I do want to free it eventually, just not while I still need it.

So, I thought I'd attach the handler to a switch:

let keep ~switch handler =
  Lwt_switch.add_hook (Some switch) (fun () ->
    ignore handler;
    Lwt.return ()
  )

But my event handler gets garbage-collected anyway (which makes sense, since the code to turn off the switch is called when the signal arrives, so it's only the signal handler keeping the switch alive in the first place).

Here's a simplified (stand-alone) version of my code:

(* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *)

let finished_event, fire_finished = React.E.create ()

let setup () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  let dont_gc_me = Lwt_react.E.map handler finished_event in
  ignore dont_gc_me;  (* What goes here? *)

  print_endline "Waiting for signal...";
  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

let () =
  let finished = Lwt.protected (setup ()) in

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  Lwt_main.run finished;
  print_endline "Done";

Without the Gc.full_major line, this normally prints Done. With it, it just hangs at Waiting for signal....

Edit: I've split setup (the real code) from the test driver and added a Lwt.protected wrapper to avoid masking the problem by accident of Lwt's cancellations.

4

4 回答 4

7

这是从我的某个项目中截取的片段,已修复以解决此弱引用问题(谢谢!)。第一部分是保持全局根指向您的对象。第二部分是将信号/事件的活跃度限制在 Lwt 线程的范围内。

请注意,反应实体被克隆并显式停止,这可能与您的期望不完全相符。

module Keep : sig 
  type t
  val this : 'a -> t
  val release : t -> unit
end = struct
  type t = {mutable prev: t; mutable next: t; mutable keep: (unit -> unit)}
  let rec root = {next = root; prev = root; keep = ignore}

  let release item =
    item.next.prev <- item.prev;
    item.prev.next <- item.next;
    item.prev <- item;
    item.next <- item;
    (* In case user-code keep a reference to item *)
    item.keep <- ignore

  let attach keep =
    let item = {next = root.next; prev = root; keep} in
    root.next.prev <- item;
    root.next <- item;
    item

  let this a = attach (fun () -> ignore a)
end

module React_utils : sig
  val with_signal : 'a signal -> ('a signal -> 'b Lwt.t) -> 'b Lwt.t
  val with_event  : 'a event -> ('a event -> 'b Lwt.t) -> 'b Lwt.t
end = struct
  let with_signal s f =
    let clone = S.map (fun x -> x) s in
    let kept = Keep.this clone in
    Lwt.finalize (fun () -> f clone)
                 (fun () -> S.stop clone; Keep.release kept; Lwt.return_unit)
  let with_event e f =
    let clone = E.map (fun x -> x) e in
    let kept = Keep.this clone in
    Lwt.finalize (fun () -> f clone)
                 (fun () -> E.stop clone; Keep.release kept; Lwt.return_unit)
end

用这个解决你的例子:

let run () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  (* We use [Lwt.async] because are not interested in knowing when exactly the reference will be released *)
  Lwt.async (fun () ->
    (React_utils.with_event (Lwt_react.E.map handler finished_event)
      (fun _dont_gc_me -> finished)));
  print_endline "Waiting for signal...";

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)
于 2013-11-14T10:57:35.900 回答
1

这是我当前的(hacky)解决方法。每个处理程序都被添加到全局哈希表中,然后在关闭开关时再次删除:

let keep =
  let kept = Hashtbl.create 10 in
  let next = ref 0 in
  fun ~switch value ->
    let ticket = !next in
    incr next;
    Hashtbl.add kept ticket value;
    Lwt_switch.add_hook (Some switch) (fun () ->
      Hashtbl.remove kept ticket;
      Lwt.return ()
    )

它是这样使用的:

Lwt_react.E.map handler event |> keep ~switch;
于 2013-11-14T15:07:32.030 回答
1

处理此问题的一种简单方法是保留对您的事件的引用,并React.E.stop在您不再需要它时调用它:

(* ocamlfind ocamlopt -package react,lwt,lwt.react,lwt.unix -linkpkg -o test test.ml *)

let finished_event, fire_finished = React.E.create ()

let run () =
  let switch = Lwt_switch.create () in

  let finished, waker = Lwt.wait () in
  let handler () = Lwt.wakeup waker () in
  let ev = Lwt_react.E.map handler finished_event in
  print_endline "Waiting for signal...";

  Gc.full_major ();  (* Force GC, to demonstrate problem *)
  fire_finished ();  (* Simulate send *)

  React.E.stop ev;

  Lwt.bind finished (fun () -> Lwt_switch.turn_off switch)

let () =
  Lwt_main.run (run ());
  print_endline "Done";
于 2013-11-15T09:40:03.470 回答
0

请注意,如果 lwt 不支持取消,那么您将通过替换Lwt.protected (setup ())来观察相同的行为Lwt.bind (setup ()) Lwt.return

基本上你所拥有的是:

finished_event --weak--> SETUP --> finished

其中SETUP是事件和 Lwt 线程之间的循环。删除 Lwt.protected 只会压缩最后一个指针,因此它恰好可以执行您想要的操作。

Lwt 只有前向指针(除了支持取消),React 只有后向指针(前向指针很弱)。所以让这个工作正常的方法是返回事件而不是线程。

于 2013-11-18T10:41:35.780 回答