问题标签 [rx.net]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
258 浏览

c# - 如何过滤子可观察对象至少有 x 个项目的可观察对象

给定具有 IObservable 子属性的对象的 IObservable,我如何去过滤主要的 observable 以仅包含子属性具有至少 X 个项目的实例。

例如下面的测试用例有一个包含 2 个 MyClass 的 IObservable。第一个包含 5 个整数的可观察值,第二个包含 2 个整数。所以我的问题是如何过滤 MyClass 的 IObservable 以仅包含包含 4 个或更多数字的实例?

任何帮助表示赞赏。

0 投票
3 回答
2790 浏览

rxjs - ReactiveX/Rx.NET 中 RxJS switchMap 的等价物

RxJS中,有一个switchMap函数。ReactiveX/Rx.NET中是否有等价物?我在转换文档中没有看到。

0 投票
5 回答
555 浏览

c# - 展开 IObservable> 进入 IObservable订单保存

有没有办法像这样展开IObservable<Task<T>>保持IObservable<T>相同的事件顺序?

假设我有一个使用消息流的桌面应用程序,其中一些需要大量的后处理:

我想有两种处理方法。

首先,我可以streamOfTasks使用异步事件处理程序订阅:

其次,我可以转换streamOfTasksusing Observable.Create,如下所示:

无论哪种方式,都不会保留消息的顺序:一些不需要任何后期处理的后期消息比需要后期处理的早期消息出来得更快。我的两个解决方案都并行处理传入的消息,但我希望它们按顺序处理,一个接一个。

我可以编写一个简单的任务队列来一次只处理一个任务,但这可能是一种矫枉过正。在我看来,我错过了一些明显的东西。


UPD。我编写了一个示例控制台程序来演示我的方法。到目前为止,所有解决方案都不保留事件的原始顺序。这是程序的输出:

这是完整的源代码:

0 投票
2 回答
1903 浏览

c# - Entity Framework Core async/wait deadlock when using "one-to-many" relationship and WebApi

I'm encountering a deadlock when using asynchronous implementation of an EF Core provider.

Say I have the following models:

Now I want to run the following query:

When I run this code via Console Application/XUnit test it works as excepted... But when I run it via ASP.Net WebApi it get into deadlock and never ends...

I used ConfigureAwait(false) all the way down in order to prevent these kind of situations but it seems that the problematic code is down underneath. I think that it might be under the System.Interactive.Async library which EFCore use - To be more specific it is under: https://github.com/Reactive-Extensions/Rx.NET/blob/develop/Ix.NET/Source/System.Interactive.Async/ToAsyncEnumerable.cs#L72 there is a call to "Result" which actually blocks the execution thread.

Does anyone encountered this behavior, maybe there is some workaround?

Notice that if I don't load the "Games" entities then everything also works fine...

Edit: Added the StackTrace: (See the call to ToEnumerable)

0 投票
1 回答
216 浏览

rx.net - 如何判断一个 Subject.OfType有任何订阅者

所以我正在尝试使用 Rx.NET 组合一个小消息总线。

这就是代码的要点。我想限制订阅,以便单个消息类型只能存在一个订阅。OfType<T>在添加新订阅之前,是否有任何现有订阅检查 Observable 的方法?

0 投票
1 回答
258 浏览

c# - Reactive Extensions ToEnumerable 如果不迭代所有内容,如何处理可观察状态

如果我ToEnumerable在 an 上使用扩展名IObservable,则用户有可能不会迭代所有元素。在那种情况下,如何正确处理IDisposable声明的?Observable.Create

为了争论起见,我们说直接将其返回给用户不是一个选项IObservable(在这种情况下,用户可以自己实现取消)。

0 投票
1 回答
116 浏览

c# - 为什么 Replay 方法会阻止 Observable 收集完成?

我有一个运行缓慢的任务要在项目列表上执行。这就是我想要实现的目标:

  1. 处理应立即开始
  2. 最多 N 个并发任务
  3. 每个项目只能运行一次任务
  4. 所有订阅者都应该看到所有结果(无论他们何时订阅)

这是我的原型 - 它按预期工作(LinqPad)

这是我的实际代码:

“结果”是 Microsoft.CodeAnalysisSolution

我尝试了几种变体
- Projects.ToList,带有 ToObservable 和不
带替换项目加载器,带有和不带 _maxConcurrent 的虚拟负载
Ran

这段代码永远挂在等待中,我似乎看不出在功能上与原型有什么不同。

如果我删除重播,代码会按预期运行,但每个订阅者都会受到打击。

有没有人知道这段代码有何不同以及为什么 Replay 方法会影响这样的执行?

我开始怀疑这一定是订阅者的问题——但这不是我现在可以轻松提取来验证的东西。

这是可以工作的原始代码(但不会节流)

0 投票
1 回答
421 浏览

c# - Hot observable 和 IDisposable

我想找到关于 hot observable 和 IDisposable 对象作为事件类型的最佳实践。

假设我的代码生成 Bitmap 对象作为热可观察对象,并且我有几个订阅者。例如:

所以问题是:处理作为热可观察源的一次性资源的最佳实践是什么?

在这个例子中,我想在使用后处理图像,但我不知道 - 究竟是什么时候?

订阅事件的调用顺序并不明显,因此我无法处理“最后一个”事件。

提前致谢。

0 投票
3 回答
663 浏览

rx-java - Rx:通过匹配的 ID 加入

假设有两个 observables o1, o2。第一个从内部进程接收事件(在很长的计算完成后),第二个通过 REST 端点接收外部事件(表示另一个外部组件也完成了)。事件数据只是一个 ID。

现在我想设计一个工作流,以便只有当两个可观察对象中都存在一个 ID 时,才会发出一个新事件(即当内部和外部计算完成时)。

让某个时间点o1包含 IDs {1,2,3},然后我想区分这些情况:

  1. 正常情况:例如 ID2到达o2。两个 ID 现在都存在于两个 observables 中,输出“Success: 2”
  2. 过期案例:内部计算完成后的一段时间,外部事件尚未到达。例如 ID2存在o1o2即使在一小时后也不存在,输出:“Expired: 2”
  3. 未知情况:ID(例如 4)通过 REST 端点到达,o2但该端点不存在o1,可能是因为 ID 已过期或仅仅是因为外部组件有故障,输出:“未知:3”

我找到了groupJoin可能做我想做的事情的操作符,这里甚至是一个属性匹配的例子:GroupJoin - Joins two streams matching by a attributes

但是,每次新事件到达时,此示例似乎都会对所有元素执行耗尽(线性时间)扫描。我认为可以推出我自己的版本来在恒定时间内检查地图,但是:我想知道是否有一种规范的方式甚至是开箱即用的功能(因为我猜这是一个很常见的用例)。

(而且我是 Rx 的新手,为这种加入操作实现过期案例的最佳方法是什么)

0 投票
1 回答
800 浏览

system.reactive - RX.Net:使用重试但记录任何异常

我是 RX 新手,一直在研究错误处理和重试的使用;我有以下内容(是的,我知道这不是一个“真正的”单元测试,但它给了我摆弄的地方!!)并且想知道我如何去保持重试但能够记录任何异常?

这导致...

...这正是我想要发生的事情,除了我想记录发生在 2 和 3 之间的异常。

有没有办法让订阅者在 OnError 中看到异常(并记录它),然后重新订阅以便它看到 3?

谢谢!