概括
首先要说明的是,您实际上不需要使用Observable.FromEvent
来避免字符串字面量引用。此版本FromEventPattern
将起作用:
var groupedKeyPresses =
Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
h => KeyPress += h,
h => KeyPress -= h)
.Select(k => k.EventArgs.KeyChar)
.GroupBy(k => k);
如果你确实想做FromEvent
工作,你可以这样做:
var groupedKeyPresses =
Observable.FromEvent<KeyPressEventHandler, KeyPressEventArgs>(
handler =>
{
KeyPressEventHandler kpeHandler = (sender, e) => handler(e);
return kpeHandler;
},
h => KeyPress += h,
h => KeyPress -= h)
.Select(k => k.KeyChar)
.GroupBy(k => k);
为什么?这是因为FromEvent
操作符的存在是为了与任何事件委托类型一起工作。
这里的第一个参数是将事件连接到 Rx 订阅者的转换函数。它接受观察者 (an Action<T>
) 的 OnNext 处理程序,并返回与将调用该 OnNext 处理程序的底层事件委托兼容的处理程序。这个生成的处理程序然后可以订阅该事件。
我从不喜欢这个函数的官方 MSDN 文档,所以这里有一个扩展的解释,逐步介绍了这个函数的用法。
Observable.FromEvent 的内幕
以下分解了为什么FromEvent
存在以及它是如何工作的:
回顾 .NET 事件订阅的工作方式
考虑 .NET 事件是如何工作的。这些被实现为委托链。标准事件委托遵循 的模式delegate void FooHandler(object sender, EventArgs eventArgs)
,但实际上事件可以与任何委托类型一起使用(即使是具有返回类型的那些!)。我们通过将适当的委托传递给将其添加到委托链(通常通过 += 运算符)的特殊函数来订阅事件,或者如果尚未订阅任何处理程序,则委托将成为链的根。这就是为什么我们在引发事件时必须进行空检查。
引发事件时,(通常)调用委托链,以便依次调用链中的每个委托。要取消订阅 .NET 事件,将委托传递给一个特殊函数(通常通过 -= 运算符),以便可以将其从委托链中删除(遍历链直到找到匹配的引用,并且该链接是从链中删除)。
让我们创建一个简单但非标准的 .NET 事件实现。在这里,我使用不太常见的添加/删除语法来公开底层委托链并使我们能够记录订阅和取消订阅。我们的非标准事件具有一个带有整数和字符串参数的委托,而不是通常的object sender
子EventArgs
类:
public delegate void BarHandler(int x, string y);
public class Foo
{
private BarHandler delegateChain;
public event BarHandler BarEvent
{
add
{
delegateChain += value;
Console.WriteLine("Event handler added");
}
remove
{
delegateChain -= value;
Console.WriteLine("Event handler removed");
}
}
public void RaiseBar(int x, string y)
{
var temp = delegateChain;
if(temp != null)
{
delegateChain(x, y);
}
}
}
回顾 Rx 订阅的工作方式
现在考虑 Observable 流是如何工作的。Subscribe
通过调用该方法并传递一个实现IObserver<T>
接口的对象来形成对可观察对象的订阅,该对象具有可观察对象调用以处理事件的OnNext
、OnCompleted
和方法。OnError
此外,该Subscribe
方法返回一个IDisposable
可以取消订阅的句柄。
更典型的是,我们使用重载的便利扩展方法Subscribe
。这些扩展接受符合OnXXX
签名的委托处理程序,并透明地创建一个AnonymousObservable<T>
其OnXXX
方法将调用这些处理程序。
桥接 .NET 和 Rx 事件
那么我们如何创建一个桥来将 .NET 事件扩展到 Rx 可观察流中呢?调用 Observable.FromEvent 的结果是创建一个 IObservable,其Subscribe
方法的作用类似于将创建此桥的工厂。
.NET 事件模式没有完成或错误事件的表示。仅针对正在引发的事件。换句话说,我们只能连接映射到 Rx 的事件的三个方面,如下所示:
- 订阅,例如调用
IObservable<T>.Subscribe(SomeIObserver<T>)
映射到fooInstance.BarEvent += barHandlerInstance
.
- 调用,例如调用
barHandlerInstance(int x, string y)
映射到SomeObserver.OnNext(T arg)
- 取消订阅
IDisposable
,例如假设我们将调用返回的处理程序保留Subscribe
到一个名为 的变量subscription
中,然后调用subscription.Dispose()
映射到fooInstance.BarEvent -= barHandlerInstance
。
请注意,只有调用行为Subscribe
会创建订阅。因此,该Observable.FromEvent
调用返回一个支持订阅、调用和取消订阅基础事件的工厂。此时,没有发生事件订阅。只有在调用Subscribe
时,观察者及其处理程序才可用OnNext
。因此,FromEvent
调用必须接受可用于在适当时间实现三个桥接操作的工厂方法。
FromEvent 类型参数
FromEvent
所以现在让我们考虑一下上述事件的正确实现。
回想一下,OnNext
处理程序只接受一个参数。.NET 事件处理程序可以有任意数量的参数。所以我们的第一个决定是选择一种类型来表示目标可观察流中的事件调用。
实际上,这可以是您希望出现在目标可观察流中的任何类型。转换函数(稍后讨论)的工作是提供将事件调用转换为 OnNext 调用的逻辑 - 并且有足够的自由来决定如何发生这种情况。
在这里,我们将int x, string y
BarEvent 调用的参数映射到描述这两个值的格式化字符串。换句话说,我们将调用fooInstance.RaiseBar(1, "a")
to 导致调用someObserver.OnNext("X:1 Y:a")
.
这个例子应该解决一个非常常见的混淆来源:类型参数FromEvent
代表什么?这里第一种类型BarHandler
是源 .NET 事件委托类型,第二种类型是目标OnNext
处理程序的参数类型。因为第二种类型通常是一个EventArgs
子类,所以通常认为它必须是 .NET 事件委托的一些必要部分——很多人忽略了它的相关性实际上是由于OnNext
处理程序的事实。所以我们调用的第一部分FromEvent
如下所示:
var observableBar = Observable.FromEvent<BarHandler, string>(
转换函数
现在让我们考虑 的第一个参数FromEvent
,即所谓的转换函数。(注意,FromEvent
省略转换函数的一些重载 - 稍后会详细介绍。)
由于类型推断,lambda 语法可以被截断很多,所以这里有一个简单的版本:
(Action<string> onNextHandler) =>
{
BarHandler barHandler = (int x, string y) =>
{
onNextHandler("X:" + x + " Y:" + y);
};
return barHandler;
}
所以这个转换函数是一个工厂函数,在调用它时会创建一个与底层 .NET 事件兼容的处理程序。工厂函数接受OnNext
委托。此委托应由返回的处理程序调用,以响应使用底层 .NET 事件参数调用的处理程序函数。将使用将 .NET 事件参数转换为OnNext
参数类型实例的结果调用委托。因此,从上面的示例中我们可以看到,工厂函数将使用一个onNextHandler
类型Action<string>
来调用——它必须使用一个字符串值来调用,以响应每个 .NET 事件调用。工厂函数创建类型的委托处理程序BarHandler
onNextHandler
对于通过使用从相应事件调用的参数创建的格式化字符串调用来处理事件调用的 .NET 事件。
通过一些类型推断,我们可以将上面的代码折叠成以下等效代码:
onNextHandler => (int x, string y) => onNextHandler("X:" + x + " Y:" + y)
因此,转换函数实现了一些事件订阅逻辑,提供了一个函数来创建适当的事件处理程序,它还完成了将 .NET 事件调用映射到 RxOnNext
处理程序调用的工作。
如前所述,有一些重载FromEvent
忽略了转换函数。这是因为如果事件委托已经与OnNext
.
添加/删除处理程序
剩下的两个参数是 addHandler 和 removeHandler ,它们负责为实际的 .NET 事件订阅和取消订阅创建的委托处理程序 - 假设我们有一个被调用的实例,Foo
那么foo
完成的FromEvent
调用如下所示:
var observableBar = Observable.FromEvent<BarHandler, string>(
onNextHandler => (int x, string y) => onNextHandler("X:" + x + " Y:" + y),
h => foo.BarEvent += h,
h => foo.BarEvent -= h);
由我们决定如何获取我们要桥接的事件 - 因此我们提供添加和删除处理程序函数,期望提供创建的转换处理程序。事件通常是通过闭包捕获的,如上面我们关闭foo
实例的示例。
现在我们已经拥有了FromEvent
observable 完全实现订阅、调用和取消订阅的所有部分。
还有一件事...
最后还有一块胶水要提。Rx 优化了对 .NET 事件的订阅。实际上,对于任何给定数量的 observable 订阅者,只需对底层 .NET 事件进行一次订阅。然后通过该Publish
机制将其多播到 Rx 订阅者。就好像 aPublish().RefCount()
已附加到可观察对象。
考虑使用上面定义的委托和类的以下示例:
public static void Main()
{
var foo = new Foo();
var observableBar = Observable.FromEvent<BarHandler, string>(
onNextHandler => (int x, string y)
=> onNextHandler("X:" + x + " Y:" + y),
h => foo.BarEvent += h,
h => foo.BarEvent -= h);
var xs = observableBar.Subscribe(x => Console.WriteLine("xs: " + x));
foo.RaiseBar(1, "First");
var ys = observableBar.Subscribe(x => Console.WriteLine("ys: " + x));
foo.RaiseBar(1, "Second");
xs.Dispose();
foo.RaiseBar(1, "Third");
ys.Dispose();
}
这会产生以下输出,表明只进行了一次订阅:
Event handler added
xs: X:1 Y:First
xs: X:1 Y:Second
ys: X:1 Y:Second
ys: X:1 Y:Third
Event handler removed
我确实帮助这有助于消除对这个复杂功能如何工作的任何挥之不去的困惑!