问题标签 [rx.net]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
system.reactive - Reactive extensions(Rx) Switch() 产生新的 observable 未订阅提供的 OnCompleted()
我使用 Switch 语句的 Rx 订阅有问题。
流程是:
- 一些属性发生变化并且调用 performSearchSubject.OnNext
- 调用 PerformPositionQuery(),每次命中都会返回一个观察者
- 通过此观察者响应的服务在数据接收完成时调用 OnNext 两次和 OnCompleted 一次
- 方法 DataArrivedForPositions 被按预期调用两次
- 永远不会调用方法 PositionQueryCompleted,尽管在我的数据服务中调用了observer.OnCompleted()。
数据服务的代码是:
c# - Rx:移动窗口中的分组事件计数
我已经开始考虑将 Reactive Extensions 与 EventStore 一起使用。作为概念证明,我想看看我是否可以让 Rx 使用事件流并输出按类型分组的事件计数,时间为一秒。
因此,假设我正在使用名为“orders”的流,我希望在控制台中看到类似以下内容:
(第二次通过..)
等等。
到目前为止,我已经能够获得每秒所有事件计数的输出。但似乎无法按事件类型对它们进行分组。
我使用的代码是基于 James Nugent 的一个要点:
rx-java - Rx:一个类似 zip 的运算符,在其中一个流结束后继续?
我正在寻找组合异步开始和结束的流(observables):
我需要它:将音频流添加在一起。它们是音频“块”流,但我将在这里用整数表示它们。所以播放了第一个剪辑:
然后第二个开始,稍晚一点:
将它们按总和组合的结果应该是:
但是,如果任何压缩流结束,标准 zip 就会完成。即使其中一个流结束,我也希望这个 optional_zip 继续运行。有没有办法在 Rx 中做到这一点,或者我必须通过修改现有的 Zip 自己实现它?
注意:我使用的是 RxPy,但这里的社区似乎很小,而且 Rx 运算符似乎在各种语言中都很通用,所以我也将它标记为 rx-java 和 rx-js。
c# - 为音频流时间使用自定义调度程序
在声音处理应用程序中,许多事件是在音频流时间(在第一个近似值中,只是消耗样本的计数乘以一个系数)而不是系统时间,在这种情况下通常称为壁时间。例如,通常希望根据流时间对未来事件进行计时。例如,如果在一定时间内没有语音活动,则可以安排计时器接受录音;为此目的,流时间比挂钟时间更好地表示时间。
我正在考虑使用HistoricalScheduler
为此类活动计时。据我了解,它在调用AdvanceBy/To
. 此外,整个应用程序都在我们的控制之下,从某种意义上说,没有第 3 方部分——至少这样会使用 RX。
问题 1:这个想法是正确的、好的还是非常愚蠢的?我理解RX,没有经验,如果你愿意的话,直觉。
流通过 .NET 事件回调传递。传递流缓冲区(并调用事件)的线程应该尽可能少地被阻塞。音频被传输到一个有限长度的无限缓冲区中,这个实际的缓冲将发生在一个观察者中,在一个专用线程上观察这个事件,这就是调度器时间提前的地方。本质上,这归结为
问题 2:我觉得从观察函数中推进调度程序很奇怪,如上例所示。这是一种可接受的做法,还是立即闻起来有腥味(或者它可能介于两个极端之间)?
另一个可能的复杂情况是流最终停止生成数据,流时间也是如此。
问题 3:如果有的话,在调度程序中停止时间意味着什么,这样一些调度的操作将永远不会执行并且将永远保留在队列中?简单地删除对调度程序的引用会导致一次性资源泄漏吗?
并且流时间也不同于挂钟时间。
问题 4:如果有的话,不同的调度程序提供不连贯的概念意味着Now
什么?
我在这里最好的猜测是“根本没有”,因为HistoricScheduler
在其他情况下,实时调度程序和平共存,但我不确定我是否足够了解 RX 的这一部分,所以也不要问这个问题。
c# - C# Reactive Extensions - 遍历 IObservable 项>
我最近开始使用C#rx
版本,想知道如何解决以下问题:
我正在使用refit通过以下方式从服务器获取项目列表:
我想在之后处理每个项目,但我没有找到如何做到这一点。我知道在 RxJava 中有一个名为的运算符flatMapIterable()
,它允许我处理每个项目,但我没有找到与C#类似的东西。
谢谢
android - 更新视图 - 反应式扩展
我有以下代码:
Observable.Interval(TimeSpan.FromMilliseconds(2500)).SubscribeOn(XXX).ObserveOn(YYY).Subscribe(t => SendCounter(t), e => HandleException(e));
其中 XXX, YYY 是Schedulers
。
在 SendCounter(t) 中,我设置了一个带有 t 值的文本。
问题是当我运行代码时出现此错误:
c# - 如何将轮询系统变成 Rx.Net IObservable?
我有一个游戏(基于 MonoGame / XNA),其更新方法如下:
我想将其转换为反应模式。我目前的解决方案是:
我是 Rx 的新手,所以我仍在学习最好的做事方式。我读到Subject
应该避免,Observable.Create
应该使用它。
Subject
这里合适吗?
在这种情况下我该如何使用Observable.Create
?
c# - 使用响应式扩展动态连接序列
我想创建一个序列,它连接一个或多个动态创建的序列(在运行时)。
我试过了mySequence = mySequence.Concat(anotherSequence)
,但这会破坏当前的订阅,mySequence
因为每次都会创建一个新序列。
c# - 如何打开 IObservable> 进入 IObservable>?
我有一个鼠标左键状态的流:
然后我Window
给我一个可观察的表示鼠标拖动的可观察对象:
现在我想制作一个流leftMouseDrag
,给出点列表。每次用户完成拖动(LMB 向下 -> 移动 -> LMB 向上)时,它应该触发鼠标移动的位置列表。
我如何把一个IObservable<IObservable<Point>>
变成一个IObservable<IEnumerable<Point>>
?