3

I am experimenting with js_of_ocaml and node.js. As you know, node.js makes extensive use of callbacks to implement asynchronous requests without introducing explicit threads.

In OCaml we have a very nice threading library, Lwt, coming with a very useful syntax extension. I wrote a prototype with a binding to some node library (a AWS S3 client) and added a lwt-ish layer to hide the callback.

open Lwt.Infix
open Printf
open Js

let require_module s =
    Js.Unsafe.fun_call
      (Js.Unsafe.js_expr "require")
      [|Js.Unsafe.inject (Js.string s)|]

let _js_aws = require_module "aws-sdk"

let array_to_list a =
  let ax = ref [] in
  begin
    for i = 0 to a##.length - 1 do
      Optdef.iter (array_get a i) (fun x -> ax := x :: !ax)
    done;
    !ax
  end


class type error = object
end

class type bucket = object
  method _Name : js_string t readonly_prop
  method _CreationDate : date t readonly_prop
end

class type listBucketsData = object
  method _Buckets : (bucket t) js_array t readonly_prop
end

class type s3 = object
  method listBuckets :
    (error -> listBucketsData t -> unit) callback -> unit meth
end

let createClient : unit -> s3 t = fun () ->
  let constr_s3 = _js_aws##.S3 in
  new%js constr_s3 ()


module S3 : sig
  type t
  val create : unit -> t
  val list_buckets : t -> (string * string) list Lwt.t
end = struct
  type t = s3 Js.t

  let create () =
    createClient ()

  let list_buckets client =
    let cell_of_bucket_data data =
      ((to_string data##._Name),
       (to_string data##._CreationDate##toString))
    in
    let mvar = Lwt_mvar.create_empty () in
    let callback error buckets =
      let p () =
        if true then
          Lwt_mvar.put mvar
            (`Ok(List.map cell_of_bucket_data @@ array_to_list buckets##._Buckets))
        else
          Lwt_mvar.put mvar (`Error("Ups"))
      in
      Lwt.async p
    in
    begin
      client##listBuckets (wrap_callback callback);
      Lwt.bind
        (Lwt_mvar.take mvar)
        (function
          | `Ok(whatever) -> Lwt.return whatever
          | `Error(mesg) -> Lwt.fail_with mesg)
    end
end

let () =
  let s3 = S3.create() in
  let dump lst =
    Lwt_list.iter_s
      (fun (name, creation_date) ->
         printf "%32s\t%s\n" name creation_date;
         Lwt.return_unit)
      lst
  in
  let t () =
    S3.list_buckets s3
    >>= dump
  in
  begin
    Lwt.async t
  end

Since there is no binding to Lwt_main for node.js, I had to run my code with Lwt.async. What are the differences between running the code with Lwt.async rather than with Lwt_main.run – the latter not existing in node.js? Is it guaranteed that the program will wait until the asynchronous threads are completed before exiting, or is this rather a lucky but random behaviour of my code?

4

1 回答 1

1

Lwt_main.run函数递归地轮询它监督执行的线程。在每次迭代中,如果该线程仍在运行,则调度程序使用一个引擎 (from Lwt_engine) 来执行等待 I/O 的线程,方法是选择或同步事件。

在 Node.JS 中翻译它的自然方法是使用process.nextTick依赖于 Node.JS 自己的调度程序的方法。在这种情况下实现该Lwt_main.run功能可以很简单:

let next_tick (callback : unit -> unit) =
  Js.Unsafe.(fun_call
               (js_expr "process.nextTick")
               [| inject (Js.wrap_callback callback) |])

let rec run t =
  Lwt.wakeup_paused ();
  match Lwt.poll t with
    | Some x -> x
    | None -> next_tick (fun () -> run t)

此函数仅运行类型的线程,unit Lwt.t但这是程序的主要情况。可以使用 a 来计算任意值进行Lwt_mvar.t通信。

也可以扩展此示例以支持所有类型的挂钩,就像在原始Lwt_main.run实现中一样。

于 2016-01-24T12:17:48.570 回答