问题标签 [rx.net]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
292 浏览

system.reactive - Reactive extensions(Rx) Switch() 产生新的 observable 未订阅提供的 OnCompleted()

我使用 Switch 语句的 Rx 订阅有问题。

流程是:

  1. 一些属性发生变化并且调用 performSearchSubject.OnNext
  2. 调用 PerformPositionQuery(),每次命中都会返回一个观察者
  3. 通过此观察者响应的服务在数据接收完成时调用 OnNext 两次和 OnCompleted 一次
  4. 方法 DataArrivedForPositions 被按预期调用两次
  5. 永远不会调用方法 PositionQueryCompleted,尽管在我的数据服务中调用了observer.OnCompleted()。

数据服务的代码是:

0 投票
1 回答
484 浏览

c# - Rx:移动窗口中的分组事件计数

我已经开始考虑将 Reactive Extensions 与 EventStore 一起使用。作为概念证明,我想看看我是否可以让 Rx 使用事件流并输出按类型分组的事件计数,时间为一秒。

因此,假设我正在使用名为“orders”的流,我希望在控制台中看到类似以下内容:

(第二次通过..)

等等。

到目前为止,我已经能够获得每秒所有事件计数的输出。但似乎无法按事件类型对它们进行分组。

我使用的代码是基于 James Nugent 的一个要点

0 投票
2 回答
2485 浏览

rx-java - Rx:一个类似 zip 的运算符,在其中一个流结束后继续?

我正在寻找组合异步开始和结束的流(observables):

我需要它:将音频流添加在一起。它们是音频“块”流,但我将在这里用整数表示它们。所以播放了第一个剪辑:

然后第二个开始,稍晚一点:

将它们按总和组合的结果应该是:

但是,如果任何压缩流结束,标准 zip 就会完成。即使其中一个流结束,我也希望这个 optional_zip 继续运行。有没有办法在 Rx 中做到这一点,或者我必须通过修改现有的 Zip 自己实现它?

注意:我使用的是 RxPy,但这里的社区似乎很小,而且 Rx 运算符似乎在各种语言中都很通用,所以我也将它标记为 rx-java 和 rx-js。

0 投票
0 回答
96 浏览

c# - 为音频流时间使用自定义调度程序

在声音处理应用程序中,许多事件是在音频流时间(在第一个近似值中,只是消耗样本的计数乘以一个系数)而不是系统时间,在这种情况下通常称为壁时间。例如,通常希望根据流时间对未来事件进行计时。例如,如果在一定时间内没有语音活动,则可以安排计时器接受录音;为此目的,流时间比挂钟时间更好地表示时间。

我正在考虑使用HistoricalScheduler为此类活动计时。据我了解,它在调用AdvanceBy/To. 此外,整个应用程序都在我们的控制之下,从某种意义上说,没有第 3 方部分——至少这样会使用 RX。

问题 1:这个想法是正确的、好的还是非常愚蠢的?我理解RX,没有经验,如果你愿意的话,直觉。

流通过 .NET 事件回调传递。传递流缓冲区(并调用事件)的线程应该尽可能少地被阻塞。音频被传输到一个有限长度的无限缓冲区中,这个实际的缓冲将发生在一个观察者中,在一个专用线程上观察这个事件,这就是调度器时间提前的地方。本质上,这归结为

问题 2:我觉得从观察函数中推进调度程序很奇怪,如上例所示。这是一种可接受的做法,还是立即闻起来有腥味(或者它可能介于两个极端之间)?

另一个可能的复杂情况是流最终停止生成数据,流时间也是如此。

问题 3:如果有的话,在调度程序中停止时间意味着什么,这样一些调度的操作将永远不会执行并且将永远保留在队列中?简单地删除对调度程序的引用会导致一次性资源泄漏吗?

并且流时间也不同于挂钟时间。

问题 4:如果有的话,不同的调度程序提供不连贯的概念意味着Now什么?

我在这里最好的猜测是“根本没有”,因为HistoricScheduler在其他情况下,实时调度程序和平共存,但我不确定我是否足够了解 RX 的这一部分,所以也不要问这个问题。

0 投票
2 回答
1125 浏览

c# - C# Reactive Extensions - 遍历 IObservable 项>

我最近开始使用C#rx版本,想知道如何解决以下问题:

我正在使用refit通过以下方式从服务器获取项目列表:

我想在之后处理每个项目,但我没有找到如何做到这一点。我知道在 RxJava 中有一个名为的运算符flatMapIterable(),它允许我处理每个项目,但我没有找到与C#类似的东西。

谢谢

0 投票
2 回答
402 浏览

c# - Rx.Net 消息解析器

我正在尝试解析表示消息的传入字节流。我需要拆分流并为每个部分创建一个消息结构。

消息始终以 0x81 (BOM) 开头并以 0x82 (EOM) 结尾。

数据部分使用转义字节 0x1B (ESC) 进行转义:只要数据部分中的字节包含控制字节 {ESC, BOM, EOM} 之一,它就会以 ESC 为前缀。

标头部分没有转义,并且可能包含控制字节。

我想使用 Rx.Net 以功能性反应风格对此进行编码,方法是使用 anIObservable<byte>并将其转换为IObservable<Message>.

最惯用的方法是什么?

一些例子:

这是一个状态机绘图: 在此处输入图像描述

0 投票
3 回答
191 浏览

android - 更新视图 - 反应式扩展

我有以下代码:

Observable.Interval(TimeSpan.FromMilliseconds(2500)).SubscribeOn(XXX).ObserveOn(YYY).Subscribe(t => SendCounter(t), e => HandleException(e));

其中 XXX, YYY 是Schedulers

在 SendCounter(t) 中,我设置了一个带有 t 值的文本。

问题是当我运行代码时出现此错误:

我正在使用这个组件:https ://components.xamarin.com/view/rxforxamarin

0 投票
2 回答
533 浏览

c# - 如何将轮询系统变成 Rx.Net IObservable?

我有一个游戏(基于 MonoGame / XNA),其更新方法如下:

我想将其转换为反应模式。我目前的解决方案是:

我是 Rx 的新手,所以我仍在学习最好的做事方式。我读到Subject应该避免,Observable.Create应该使用它。

Subject这里合适吗?

在这种情况下我该如何使用Observable.Create

0 投票
1 回答
431 浏览

c# - 使用响应式扩展动态连接序列

我想创建一个序列,它连接一个或多个动态创建的序列(在运行时)。

我试过了mySequence = mySequence.Concat(anotherSequence),但这会破坏当前的订阅,mySequence因为每次都会创建一个新序列。

0 投票
1 回答
153 浏览

c# - 如何打开 IObservable> 进入 IObservable>?

我有一个鼠标左键状态的流:

然后我Window给我一个可观察的表示鼠标拖动的可观察对象:

现在我想制作一个流leftMouseDrag,给出点列表。每次用户完成拖动(LMB 向下 -> 移动 -> LMB 向上)时,它应该触发鼠标移动的位置列表。

我如何把一个IObservable<IObservable<Point>>变成一个IObservable<IEnumerable<Point>>