1

在 Suave 2.4.0 支持下TransferEncoding.chunkedHttpOutput.writeChunk我编写了以下代码通过 HTTP 流式传输数据。

let sendStrings getStringsFromProducer : WebPart =
    Writers.setStatus HTTP_200 >=>
    TransferEncoding.chunked (fun conn -> socket {
        let refConn = ref conn

        for str in getStringsFromProducer do
            let! (_, conn) = (str |> stringToBytes |> HttpOutput.writeChunk) !refConn
            refConn := conn

        return! HttpOutput.writeChunk [||] !refConn
    }
)

虽然这可行,但我质疑使用的可靠性,ref并希望有更好的方法以更实用的方式做同样的事情。有没有更好的方法来做到这一点?假设我不能改变getStringsFromProducer

4

2 回答 2

4

我认为在这种情况下你不能避免所有的突变——一个一个地编写块是一个相当必要的操作,并且迭代一个惰性序列也需要(可变的)迭代器,所以没有办法避免所有的突变。我认为您的sendStrings函数在向消费者隐藏突变方面做得很好,并提供了一个很好的功能 API。

您可以避免使用ref单元格并将它们替换为局部可变变量,这更安全一些 - 因为可变变量无法逃脱局部范围:

TransferEncoding.chunked (fun conn -> socket {
    let mutable conn = conn
    for str in getStringsFromProducer do
        let! _, newConn = HttpOutput.writeChunk (stringToBytes str) conn
        conn <- newConn
    return! HttpOutput.writeChunk [||] conn
}

您可以通过使用递归来避免可变conn变量,但这需要您使用IEnumerator<'T>而不是使用漂亮的for循环来迭代序列,所以我认为这实际上不如使用可变变量的版本好:

TransferEncoding.chunked (fun conn -> socket {
    let en = getStringsFromProducer.GetEnumerator()
    let rec loop conn = socket {
      if en.MoveNext() then 
        let! _, conn = HttpOutput.writeChunk (stringToBytes en.Current) conn
        return! loop conn }
    do! loop conn
    return! HttpOutput.writeChunk [||] conn }) 
于 2018-05-17T10:01:19.253 回答
0

我一直在寻找一种以一般方式替换 F# 中的 refs/mutables 的方法,虽然我想出了一个解决方案,但在你的情况下它可能有点矫枉过正。看起来 ref 是一个仅在单个线程内更新的本地,因此它可能相当安全。但是,如果你想更换它,我是这样解决问题的:

type private StateMessage<'a> =
| Get of AsyncReplyChannel<'a>
| GetOrSet of 'a * AsyncReplyChannel<'a>
| GetOrSetResult of (unit -> 'a) * AsyncReplyChannel<'a>
| Set of 'a
| Update of ('a -> 'a) * AsyncReplyChannel<'a>

type Stateful<'a>(?initialValue: 'a) =
    let agent = MailboxProcessor<StateMessage<'a>>.Start
                <| fun inbox ->
                    let rec loop state =
                        async {
                            let! message = inbox.Receive()
                            match message with
                            | Get channel -> 
                                match state with
                                | Some value -> channel.Reply(value)
                                | None -> channel.Reply(Unchecked.defaultof<'a>)
                                return! loop state
                            | GetOrSet (newValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | GetOrSetResult (getValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    let newValue = getValue ()
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | Set value -> 
                                return! loop (Some value)
                            | Update (update, channel) ->
                                let currentValue =
                                    match state with
                                    | Some value -> value
                                    | None -> Unchecked.defaultof<'a>
                                let newValue = update currentValue
                                channel.Reply(newValue)
                                return! loop (Some newValue)
                        }
                    loop initialValue

    let get () = agent.PostAndReply Get
    let asyncGet () = agent.PostAndAsyncReply Get
    let getOrSet value = agent.PostAndReply <| fun reply -> GetOrSet (value, reply)
    let asyncGetOrSet value = agent.PostAndAsyncReply <| fun reply -> GetOrSet (value, reply)
    let getOrSetResult getValue = agent.PostAndReply <| fun reply -> GetOrSetResult (getValue, reply)
    let asyncGetOrSetResult getValue = agent.PostAndAsyncReply <| fun reply -> GetOrSetResult (getValue, reply)
    let set value = agent.Post <| Set value
    let update f = agent.PostAndReply <| fun reply -> Update (f, reply)
    let asyncUpdate f = agent.PostAndAsyncReply <| fun reply -> Update (f, reply)

    member __.Get () = get ()
    member __.AsyncGet () = asyncGet ()
    member __.GetOrSet value = getOrSet value
    member __.AsyncGetOrSet value = asyncGetOrSet value
    member __.GetOrSetResult getValue = getOrSetResult getValue
    member __.AsyncGetOrSetResult getValue = asyncGetOrSetResult getValue
    member __.Set value = set value
    member __.Update f = update f
    member __.AsyncUpdate f = asyncUpdate f

这基本上使用 aMailboxProcessor将更新序列化为由尾递归函数管理的状态,类似于 Tomas 的第二个示例。但是,这允许您以更像传统可变状态的方式调用 Get/Set/Update,即使它实际上并没有进行突变。你可以像这样使用它:

let state = Stateful(0)
state.Get() |> printfn "%d"
state.Set(1)
state.Get() |> printfn "%d"
state.Update(fun x -> x + 1) |> printfn "%d"

这将打印:

0
1 
2
于 2018-05-17T13:30:59.470 回答