7

I'm using a slightly modified version of the RX builder presented here:

http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html

Rather than taking IObservable<'T> directly my computational expression has a type of:

type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)

let extract (MyType t) = t

Combinators then take on the form:

let where (f: 'b -> bool) (m:MyType<_,'b>) = MyType(fun input -> (extract m input).Where(f))

Within the expression itself, I often need to reference back to previous values that have been fed into the stream. In order to do so, I've defined a MyType which maintains a rolling immutable list of the n most recent values.

let history n = 
    MyType(fun input ->
        Observable.Create(fun (o:IObserver<_>) ->
            let buffer = new History<_>(n)
            o.OnNext(HistoryReadOnly(buffer))
            input.Subscribe(buffer.Push, o.OnError, o.OnCompleted)
        )
    )

With this, I can now do something like:

let f = obs {
    let! history = history 20
    // Run some other types, and possibly do something with history
}

I am finding that I am using this history quite frequently, ideally I would want to have this embedded directly into IObservable<'a>. Obviously I can't do that. So my question is, what is a reasonable way to introduce this concept of history that I have here. Should I be extending IObservable<'T> (not sure how to do that), wrapping the IObservable<'T>?

I appreciate any suggestions.

Edit: Added full example code.

open System
open System.Collections.Generic
open System.Reactive.Subjects
open System.Reactive.Linq

// Container function
type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)
let extract (MyType t) = t

// Mini Builder
let internal mbind (myTypeB:MyType<'a,'b>) (f:'b -> MyType<'a,'c>) = 
    MyType(fun input ->
        let obsB = extract myTypeB input
        let myTypeC= fun resB -> extract (f resB) input
        obsB.SelectMany(myTypeC)
    )

type MyTypeBuilder() = 
    member x.Bind (m,f) = mbind m f
    member x.Combine (a,b) = MyType(fun input -> (extract a input).Concat(extract b input))
    member x.Yield (r) = MyType(fun input -> Observable.Return(r))
    member x.YieldFrom (m:MyType<_,_>) = m
    member x.Zero() = MyType(fun input -> Observable.Empty())
    member x.Delay(f:unit -> MyType<'a,'b>) = f() 

let mtypeBuilder = new MyTypeBuilder()

// Combinators
let simplehistory = 
    MyType(fun input ->
        Observable.Create(fun (o:IObserver<_>) ->
            let buffer = new List<_>()
            o.OnNext(buffer)
            input.Subscribe(buffer.Add, o.OnError, o.OnCompleted)
        )
    )

let where (f: 'b -> bool) m = MyType(fun input -> (extract m input).Where(f))
let take (n:int) m = MyType(fun input -> (extract m input).Take(n))
let buffer m = MyType(fun input -> (extract m input).Buffer(1))
let stream = MyType(id)

// Example
let myTypeResult (t:MyType<'a,'b>) (input:'a[]) = (extract t (input.ToObservable().Publish().RefCount())).ToArray().Single()

let dat = [|1 .. 20|]

let example = mtypeBuilder {
    let! history = simplehistory
    let! someEven = stream |> where(fun v -> v % 2 = 0) // Foreach Even
    let! firstValAfterPrevMatch = stream |> take 1 // Potentially where a buffer operation would run, all values here are after i.e. we cant get values before last match
    let! odd = stream |> where(fun v -> v % 2 = 1) |> take 2 // Take 2 odds that follow it
    yield (history.[history.Count - 1], history.[0], someEven,firstValAfterPrevMatch, odd) // Return the last visited item in our stream, the very first item, an even, the first value after the even and an odd
}

let result = myTypeResult example dat

val result : (int * int * int * int * int) [] =
  [|(5, 1, 2, 3, 5); (7, 1, 2, 3, 7); (7, 1, 4, 5, 7); (9, 1, 4, 5, 9);
    (9, 1, 6, 7, 9); (11, 1, 6, 7, 11); (11, 1, 8, 9, 11); (13, 1, 8, 9, 13);
    (13, 1, 10, 11, 13); (15, 1, 10, 11, 15); (15, 1, 12, 13, 15);
    (17, 1, 12, 13, 17); (17, 1, 14, 15, 17); (19, 1, 14, 15, 19);
    (19, 1, 16, 17, 19)|]
4

3 回答 3

3

使用标准的 Rx 工作流构建器,您可以创建一个history处理您的示例用例的函数:

let history (io:IObservable<_>) = 
  io.Scan(new List<_>(), (fun l t -> l.Add t; l)).Distinct()

let io = new Subject<int>()
let newio = rx { let! history = history io
                 let! even = io.Where(fun v -> v % 2 = 0)
                 let! next = io.Take 1
                 let! odd = io.Where(fun v -> v % 2 = 1).Take 2
                 yield (history.Last(), history.[0], even, next, odd) }

newio.Subscribe(printfn "%O") |> ignore

for i = 1 to 20 do
  io.OnNext i

扩展它以提供历史长度限制应该是微不足道的。您是否有特定的原因需要定义自己的类型/构建器,或者这样做只是为了达到这样的目的?

这是一个使用组合器的示例。您只需要在rx块外定义可观察对象。您可以history使用不可变的历史记录而不是持久列表以不同的方式工作,所以无论您需要什么。

let history (io:IObservable<_>) = 
  io.Scan(new List<_>(), (fun l t -> l.Add t; l))

let newest (hist:'a List) = hist.Last()
let extract (ioHist:'a List IObservable) = ioHist.Select newest
let take (i:int) (ioHist: 'a List IObservable) = ioHist.Take i
let where (f: 'a -> bool) (ioHist: 'a List IObservable) = ioHist.Where(fun hist -> f(newest hist))

let io = new Subject<int>()
let history1 = history io
let newio =
 rx { let! hist = history1.Distinct()
      let! even = extract (history1 |> where (fun v -> v % 2 = 0))
      let! next = extract (history1 |> take 1)
      let! odd = extract (history1 |> where (fun v -> v % 2 = 1) |> take 2)
      yield (hist.Last(), hist.[0], even, next, odd) }
于 2013-08-09T05:12:09.163 回答
2

您已经可以使用Observable.Buffer来执行此操作。抱歉,我的 F# 帽子今天没有思考 C#。

IObservable<int> source = ...
IOBservable<IList<int>> buffered = source.Buffer(5,1)

将为您创建一个列表流。

或者尝试在 LINQ 中使用类似于 F# 查询表达式的缓冲区

Console.WriteLine ("START");
var source = new List<int> () { 1, 2, 3, 4, 5 }.ToObservable ();

// LINQ C#'s Monad sugar
var r = 
        from buffer in source.Buffer (3, 1)
        from x in buffer
        from y in buffer
        select new { x,y};


r.Subscribe (o=>Console.WriteLine (o.x + " " + o.y));
Console.WriteLine ("END");

LINQ 中的注释与f# 查询表达式中的注释from完全相同/几乎相同。let!结果如下。还要注意我buffer稍后在expressionf# 查询表达式中的使用方式。

START
1 1
1 2
1 3
2 1
2 2
2 3
3 1
3 2
3 3
2 2
2 3
2 4
3 2
3 3
3 4
4 2
4 3
4 4
3 3
3 4
3 5
4 3
4 4
4 5
5 3
5 4
5 5
4 4
4 5
5 4
5 5
5 5
END
于 2013-08-07T15:01:21.810 回答
1

抱歉,我的 F# 非常生锈,但也许您正在寻找Scan操作员。当源产生它们时,它将值推送到累加器,然后您可以使用此累加器为您的投影产生下一个值。

在这里(在 C# 中,抱歉)我们采用 [0..10] 的序列,它以 100ms 的间隔产生这些值,然后我们返回一个运行中的 Sum。

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
                       .Take(10);

source.Scan(
    new List<long>(),               //Seed value
    (acc, value)=>                  //Accumulator function
        {
            acc.Add(value);
            return acc;
        }
    )
    .Select(accumate=>accumate.Sum())

它产生的值 [0,1,3,6,10,15,21,28,36,45] 相隔 100 毫秒。

我认为使用此工具,您可以管理值的历史记录(通过将它们添加到历史记录/累加器),然后在选择中使用此历史记录来投影适当的值。

于 2013-08-08T12:51:23.410 回答