问题标签 [rx.net]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
1703 浏览

c# - 使用 AsObservable 观察 TPL 数据流块而不消耗消息

我有一个 TPL 数据流块链,想观察系统内部某处的进展。

我知道我可以将 aTransformBlock插入我想要观察的网格中,让它发布到某种进度更新器,然后将消息原封不动地返回到下一个块。我不喜欢这个解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑。

所以我想知道我是否可以ISourceBlock<T>.AsObservable用来观察网格内的消息传递,而无需更改它并且不消耗消息。如果可行的话,这似乎是一个更纯粹、更实用的解决方案。

从我对 Rx 的(有限)理解来看,这意味着我需要 observable 是热的而不是冷的,这样我的progress更新程序才能看到消息但不消耗它。并且.Publish().RefCount()似乎是使可观察到的热的方法。但是,它根本无法按预期工作 - 而是接收并使用block2每条progress消息。

结果是不确定的,但我得到了这样的混合:

那么,我做错了什么,还是由于 TPL 数据流AsObservable的实现方式而这不可能?

我意识到我也可以用 Observable/Observer 对替换LinkTobetween block1and ,这可能会起作用,但下游是我首先使用 TPL Dataflow 的全部原因。block2LinkToBoundedCapacity = 1

编辑: 一些澄清:

  • 我确实打算BoundedCapacity=1在block2中设置。虽然在这个简单的示例中没有必要,但下游受限的情况是我发现 TPL 数据流真正有用的地方。
  • 为了澄清我在第二段中拒绝的解决方案,可以在 block1 和 block2 之间添加以下链接:

    var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

  • 我还想保持背压,这样如果更上游的区块正在向其他同等工作人员分配工作,如果该链已经很忙block1,它就不会向其发送工作。block1

0 投票
1 回答
4110 浏览

.net - RxJS/Rx.Net Observable-subscribe vs events - Performance/Threads

I recently started working on Reactive Extensions, mostly observables at client side using Angular 2. The concept of observables of Rx and events of dotnet seems to be very similar. Are there any specific examples where one is applicable and other is not. If not, is there any other reason why Microsoft introduced Rx.Net, as observables stand at the core of Reactive Extensions. Any links or real time examples would suffice. Has it got something better to do with respect to async/await, tasks or threads? I am looking for differences threading/performance wise.

0 投票
4 回答
1591 浏览

c# - 如何使用带有异步操作的 Rx.Nex 扩展 ForEachAsync

我有从 SQL 流式传输数据并将其写入不同存储的代码。代码大致是这样的:

我想为此使用反应式扩展。理想情况下,代码如下所示:

但是,扩展方法 ForEachAsync 似乎只接受同步操作。是否可以编写一个接受异步操作的扩展?

0 投票
2 回答
2241 浏览

javascript - React Native、Rx.Net 和 RxSwift:共性

我正在使用RxSwift作为其他人开始的项目的一部分。

为了更多地了解 ReactiveX 的理论,我偶然发现了React NativeRx.Net

我想确保我正确理解以下内容:

  • React Native是一个 Java 脚本库,允许开发人员构建本地用户界面。该库将 Javascript 代码转换为本地移动代码。
  • RxSwift旨在实现Rx.Net的概念,但与React Native不同并且没有直接链接。共同点是它们都实现了响应式编程的概念,因此为什么在他们的名字中共享使用“React”/“Reactive”这个词。这就是共同点应该结束的地方,对吧?

结论:

  • 由于 RxSwift 和 React Native 都与用户界面实现紧密相连,因此不可能一起使用它们。正确的?
0 投票
1 回答
65 浏览

nhibernate - 如何在 Rx.NET 中使用 NHibernate?

我正在浏览博客和 reactivex.net 书籍,但很难了解如何从 NHibernate 查询创建可观察流。在 RxJava 中,我会使用 Observable.fromCallable - 或者这甚至不是最好的方法。我知道我需要从查询结果中打开一个无状态流,但是关于如何做到这一点的语法却让我难以理解。

从 NHibernate 查询创建 Observable 流的语法是什么?另外,如果不同,如何发出坚持?

谢谢!

0 投票
4 回答
615 浏览

c# - 是否可以在按 key 分组的 hot observable 中对最新项目进行采样?

在 Rx.NET 中,是否可以在按 key 分组的 hot observable 中对最新项目进行采样?

例如,如果我有一个IObservable<Price>,则在哪里Price

让我们假设IObservable链接到外部价格馈送。

我是否能够检索所有最新Price的 s,按 分组Key,使用 Rx 每 1 秒采样一次?

0 投票
1 回答
101 浏览

c# - .netcore TestScheduler 的 Nuget 包

这很愚蠢,但我似乎找不到包含 TestScheduler 的 .net rx nuget 包

请指出我正确的方向。

0 投票
1 回答
753 浏览

c# - 在 rx.net 中使用 .buffer 的成对运算符

我需要从外部驱动程序(例如 USB 密钥)自动备份我的文件当插入驱动程序时。要知道何时插入新的 USB 或硬盘驱动器我使用了一个 observable,问题是我知道的唯一方法获取连接的驱动程序是通过使用以下功能:

返回包含每个链接驱动程序的列表:我实现此函数以了解何时连接了新的大容量存储器:

它不起作用,因为我只比较了一对二对二驱动程序(使用“除外”功能) 数值示例:1,2;3.4;5.6;. 这样,如果我重新插入相同的驱动程序,我将两次备份它,因为我不会将第三个驱动程序与第一个驱动程序进行比较(例如)。

我认为我需要的行为是 rxjs 中的成对运算符,而不是 .net 中的行为,即: 1.2;2.3;3.4;4.5;5.6;ETC

附言。抱歉,如果我不太清楚并且我的英语很差:^)

感谢您的任何回答

0 投票
1 回答
306 浏览

c# - 在 Rx.NET / ReactiveUI 中使用树

如何观察树的任何子级别的属性变化?

例如,考虑一个TreeNode具有属性Name和的类ChildNodes。如何观察Namea 的任何子级别的变化TreeNode

用法可能如下所示:

树节点示例:

0 投票
1 回答
449 浏览

c# - '滑动' RX.net .Throttle() 窗口

我对一个超出正常限制的可观察流有一些特殊需要/要求,我不完全确定如何做到这一点:

基本上我有一个可观察的流,最初来自一个正常的事件,如下所示:

现在,由于这些事件可以快速连续发生,我只需要知道它是否在给定的时间范围内至少发生一次,我通常会使用 .Throttle(),即像这样:

但是我的实际要求更进一步:如果在该限制 TimeSpan / dueTime 内引发了一个事件,并且如果在第一个事件之后引发了另一个事件但仍在该到期时间之内,我希望受限制的流再次从 0 等待时间重新开始再等待 300 毫秒……如果引发了另一个事件,请再次重新开始/延长该时间……依此类推。只有在原始或重新启动的 TimeSpan(s)/dueTime 内没有引发其他事件时,才someThrottledEventObservable应该产生一个新的 Unit 实例。

我希望这是有道理的 - 但基本上我想要/需要一个受限制的事件流,只要源流在给定时间内停止产生新事件并且如果在该等待时间内发生新事件,受限制的流应该重新启动等待。

或者:在持续的事件“风暴”中,仅 .Throttle() 就会每 300 毫秒产生一个新单元(在上面的示例中),但是每当一个或多个事件被触发但在 300 毫秒内没有发生新事件时,我都想要一个新单元之后的冷却期。

我该怎么做?