I'm using a slightly modified version of the RX builder presented here:
http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html
Rather than taking IObservable<'T>
directly my computational expression has a type of:
type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)
let extract (MyType t) = t
Combinators then take on the form:
let where (f: 'b -> bool) (m:MyType<_,'b>) = MyType(fun input -> (extract m input).Where(f))
Within the expression itself, I often need to reference back to previous values that have been fed into the stream. In order to do so, I've defined a MyType
which maintains a rolling immutable list of the n
most recent values.
let history n =
MyType(fun input ->
Observable.Create(fun (o:IObserver<_>) ->
let buffer = new History<_>(n)
o.OnNext(HistoryReadOnly(buffer))
input.Subscribe(buffer.Push, o.OnError, o.OnCompleted)
)
)
With this, I can now do something like:
let f = obs {
let! history = history 20
// Run some other types, and possibly do something with history
}
I am finding that I am using this history quite frequently, ideally I would want to have this embedded directly into IObservable<'a>
. Obviously I can't do that. So my question is, what is a reasonable way to introduce this concept of history that I have here. Should I be extending IObservable<'T>
(not sure how to do that), wrapping the IObservable<'T>
?
I appreciate any suggestions.
Edit: Added full example code.
open System
open System.Collections.Generic
open System.Reactive.Subjects
open System.Reactive.Linq
// Container function
type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)
let extract (MyType t) = t
// Mini Builder
let internal mbind (myTypeB:MyType<'a,'b>) (f:'b -> MyType<'a,'c>) =
MyType(fun input ->
let obsB = extract myTypeB input
let myTypeC= fun resB -> extract (f resB) input
obsB.SelectMany(myTypeC)
)
type MyTypeBuilder() =
member x.Bind (m,f) = mbind m f
member x.Combine (a,b) = MyType(fun input -> (extract a input).Concat(extract b input))
member x.Yield (r) = MyType(fun input -> Observable.Return(r))
member x.YieldFrom (m:MyType<_,_>) = m
member x.Zero() = MyType(fun input -> Observable.Empty())
member x.Delay(f:unit -> MyType<'a,'b>) = f()
let mtypeBuilder = new MyTypeBuilder()
// Combinators
let simplehistory =
MyType(fun input ->
Observable.Create(fun (o:IObserver<_>) ->
let buffer = new List<_>()
o.OnNext(buffer)
input.Subscribe(buffer.Add, o.OnError, o.OnCompleted)
)
)
let where (f: 'b -> bool) m = MyType(fun input -> (extract m input).Where(f))
let take (n:int) m = MyType(fun input -> (extract m input).Take(n))
let buffer m = MyType(fun input -> (extract m input).Buffer(1))
let stream = MyType(id)
// Example
let myTypeResult (t:MyType<'a,'b>) (input:'a[]) = (extract t (input.ToObservable().Publish().RefCount())).ToArray().Single()
let dat = [|1 .. 20|]
let example = mtypeBuilder {
let! history = simplehistory
let! someEven = stream |> where(fun v -> v % 2 = 0) // Foreach Even
let! firstValAfterPrevMatch = stream |> take 1 // Potentially where a buffer operation would run, all values here are after i.e. we cant get values before last match
let! odd = stream |> where(fun v -> v % 2 = 1) |> take 2 // Take 2 odds that follow it
yield (history.[history.Count - 1], history.[0], someEven,firstValAfterPrevMatch, odd) // Return the last visited item in our stream, the very first item, an even, the first value after the even and an odd
}
let result = myTypeResult example dat
val result : (int * int * int * int * int) [] =
[|(5, 1, 2, 3, 5); (7, 1, 2, 3, 7); (7, 1, 4, 5, 7); (9, 1, 4, 5, 9);
(9, 1, 6, 7, 9); (11, 1, 6, 7, 11); (11, 1, 8, 9, 11); (13, 1, 8, 9, 13);
(13, 1, 10, 11, 13); (15, 1, 10, 11, 15); (15, 1, 12, 13, 15);
(17, 1, 12, 13, 17); (17, 1, 14, 15, 17); (19, 1, 14, 15, 19);
(19, 1, 16, 17, 19)|]