问题标签 [rx.net]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
rx.net - 如何将事件发布到 IObservable
所以说我有以下课程:
如何将 ChangeUsername(实现 IEvent)附加到我的事件序列中?并通知我的订阅者发生了 ChangeUsername 事件。
rxjs - 中频交易系统的颠覆者与反应式架构
我正在尝试为我正在研究的中频交易系统选择合适的架构。目前,我从 Web Socket 或 Rest 接收消息并在那里处理它们。有时它包括 IO 操作(即额外的休息请求),所以它工作非常缓慢,我想所有其他消息都在 Web Socket 客户端的实现中得到缓冲。这种幼稚的方法看起来不太可扩展。
我一直在阅读处理交易消息的成熟架构,目前,我的选择已缩小到 Disruptor 和 Reactive 编程。我想征求您的意见,哪个是更好的选择。具体来说,我担心两种情况:
- 消息处理程序之间的逻辑依赖关系。当我连接到特定交易所时,我需要接收余额和未结订单,然后才能处理交易消息并根据它们下订单。在我看来,响应式是处理这种需要流量控制的情况的更好方法。Disruptor 有问题吗?
- 长时间运行的消息处理程序。消息处理程序应该尽可能快(不要阻止以下消息),但是如果我需要发出一个休息请求来创建一个订单作为消息处理程序的一部分,那么正确的方法是什么?
c# - 某处是否有 Ix.NET (System.Interactive) 的示例?
我有一个异步方法,比如:
并将从以下位置调用:
上面的语法无效,但基本上我在使用异步生成器。我知道它可以通过 Observables 处理。我确实对 Rx.NET 进行了实验,它在一定程度上起作用。但我试图避免它给代码库带来的复杂性,更重要的是,上述要求本质上仍然不是一个反应式系统(我们的仍然是基于拉的)。例如,我只会在一段时间内收听传入的异步流,并且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。
我可以像这样反转方法签名:
但这使得在不阻塞的情况下进行 LINQ 操作有点棘手。我希望它是非阻塞的,并且不将整个东西加载到内存中。这个库:AsyncEnumerable完全符合我的要求,但如何使用Ix.NET完成相同的操作?我相信它们是为了同样的事情。
换句话说,我如何利用 Ix.NETIAsyncEnumerable
在处理时生成一个await
?喜欢,
wpf - ReactiveUI 和 WithLatestFrom
我刚刚开始研究 ReactiveUI,我想我错过了一些东西。假设我有一个“连接”按钮,并想根据文本框中的服务器地址创建一个新的网络连接。我想,我会创建一个 ReactiveCommand 并将其绑定到 Button,然后使用服务器地址属性的值执行类似 WithLatestFrom 的操作(这就是我在 Java 或 Typescript 中完成的方式)。
但是我找不到正确的语法。谁能详细说明?BR,丹尼尔
c# - Zip 的行为不符合预期
在我的最新项目中,我决定测试响应式编程世界提供的一些功能。由于它是一个 C# 项目,我已经开始使用 System.Reactive 和 ReactiveProperty nuget 包。在我的项目中,我尝试通过以下代码组合绑定到 WPF 文本框控件的 2 个属性:
绑定到 RxTest 的文本框获取“演示测试”的初始值 - 但是当我编辑绑定到 InputFieldFirst 或 InputFieldSecond 的文本框的内容时,不会触发更新。
当我按如下更改代码时,对 InputFieldFirst 的所有更新都是可见的,并在 RxTest 中按预期延迟。
任何提示如何设置 .Zip() 都会非常好。
更新完成
文本框的 XAML 代码
c# - RX 术语:当有频繁的可观察通知时,RX 运算符中的异步处理
目的是在 RX 运算符中对稀缺资源进行一些异步工作,例如 Select。当可观察通知的发送速度快于异步操作完成所需的时间时,就会出现问题。
现在我实际上解决了这个问题。我的问题是这种特殊问题的正确术语是什么?它有名字吗?是背压吗?到目前为止,我所做的研究表明,这是某种压力问题,但根据我的理解,不一定是背压。我找到的最相关的资源是: https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0) http://reactivex.io/documentation/operators/backpressure.html
现在到实际代码。假设有一种稀缺资源,它是消费者。在这种情况下,资源在使用时会引发异常。请注意,不应更改此代码。
现在这是使用资源的幼稚实现的问题。抛出错误是因为通知的速度比消费者运行的速度更快。
使用下面的代码,问题就解决了。我通过将已完成的 BehaviorSubject 与 Zip 运算符结合使用来减慢处理速度。本质上,这段代码所做的是采用顺序方法而不是并行方法。
问题 这是背压吗?如果不是,是否还有其他相关术语?
c# - 从可观察到的最新历史数据切换到可观察到且无重复的实时数据
应用程序(Saver)通过 websocket 从远程服务器接收实时数据并将其存储在数据库中。它向客户端公开一个 REST 端点,该端点返回迄今为止存储在数据库中的所有数据。
客户端应用程序在启动时订阅远程服务器的 websocket 上的实时数据。然后它向 Saver 的 REST 端点发出请求,并接收到目前为止的所有数据。
IObservable<AType>
两个数据源都像在客户端应用程序中一样公开。
AType
包括 Timestamp 属性。
如何将这两个 Observable 组合起来,使它们是连续的(按时间戳)而没有重复?
更新:在任一数据源/Observables 中都不可能出现重复,但在它们组合时是可能的,因为在调用 REST 端点之前订阅了 websocket。订阅它们是为了避免数据丢失。
c# - 使用不同的参数值运行任务 N 时间 - RX.net
我想要实现的是创建一个全局任务,它可以在一次运行中执行 n 次,但它可以将对象作为参数并具有不同的值,如下所示
我有一个列表,其中包含我的全局任务将要接收的所有对象,
然后我有一个委托函数列表
然后我想将此列表中的每个任务称为执行任务,我看到的是我必须设置要调用的参数,在这种情况下,我为所有任务都设置了“A”,但它需要第一次初始化这还不错,因为它是我想要的。
但我的问题是,还有其他方法可以做到这一点吗?每次我调用 Observable.Start 时,这些任务已经运行了吗?或者当我用 Observable.Start 调用它们时它们才开始?在不考虑首次初始化的情况下传递“A”值 (StringList.Add("A") ,StringList.Add("C")...) 是错误还是发生了什么?
reactive-programming - 如何在 rx.net 中组合 GroupedObservables?
我有一个 observableGroupBy
用于获取多个流。我实际上想要Scan
每个子流的结果。假设 observable 超过产品价格,扫描结果是每种产品类型的平均价格。
我有另一个与这些“产品”相关的事件流(比如说“显示产品价格”事件),我想将它与上一个流的最新产品价格结合起来。因此,Scan
每组的输出需要与事件流的每个元素相结合,以获得该事件产品的最新平均价格。
出于某种原因,我无法获得正确的语法,并且我整天都在抨击这一点。有人可以帮忙吗?
更新
我添加下面的代码来说明大概的意图。
c# - Visual Studio 2013 中的 C# 方法重载解决问题
在 Rx.NET 库中提供这三种方法
我在MSVS 2013中编写了以下示例代码:
由于不明确的重载,这不会编译。编译器的确切输出为:
但是,一旦我替换while( true )
为while( false )
或var condition = true; while( condition )...
错误消失了,方法调用解决了这个问题:
那里发生了什么?