问题标签 [rx.net]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
172 浏览

c# - LinqToTwitter - 如何在考虑并发的情况下正确处理可观察流

我使用 LinqToTwitter 创建了一个 observable 集合IObservable<Tweet>,如下所示。问题是当我处理第一个可观察对象并订阅新的可观察对象时,此实现存在并发问题。

如何正确处理第一个 observable?

(下面的示例应该是完整的并且可以正常工作,只需添加引用的包和 Twitter 凭据。)

这是发生此问题的示例:

如果在StartAsync创建第二个 observable 之前执行第一个 observable 创建的闭包中的方法,那么disposed将设​​置为true并且一切都很好。

但是如果第二个 observable 是在下一次执行第一个闭包之前创建的,StartAsync disposed则再次将其设置为 false 并且s.CloseStream();永远不会被调用。

下面是 observable 的创建:

最后是Tweet课程:

0 投票
1 回答
1435 浏览

c# - 调度程序:立即与 CurrentThread

在阅读了原因的解释后

永远不会完成,但是

按预期工作。我仍然很困惑,我不知道为什么CurrentThread实际上解决了这个问题。有人可以给出明确的解释吗?

0 投票
3 回答
2508 浏览

c# - 使用 Reactive Extensions (RX),是否可以添加“暂停”命令?

我有一个类,它接收一个事件流,然后推出另一个事件流。

所有事件都使用响应式扩展 (RX)。传入的事件流从外部源推送到IObserver<T>using .OnNext,而传出的事件流使用IObservable<T>and推送出去.Subscribe。我Subject<T>用来管理这个,在幕后。

我想知道 RX 中有什么技术可以暂时暂停输出。这意味着传入事件将在内部队列中建立,当它们被取消暂停时,事件将再次流出。

0 投票
4 回答
2027 浏览

c# - 使用 Rx 运行周期性任务的好方法是什么,具有单个并发执行限制?

我想在一个限制条件下运行定期任务,即在任何给定时间最多只运行一个方法的执行。

我正在尝试使用 Rx,但我不确定如何最多施加一次并发限制。


此外,如果任务仍在运行,我希望后续计划过去。即我不希望任务排队并导致问题。

我有 2 个这样的任务要定期执行。正在执行的任务当前是同步的。但是,如果有必要,我可以让它们异步。

0 投票
1 回答
41 浏览

system.reactive - Using Reactive Extensions, how can I ignore a sequence of characters based on delimiters?

I have an app that uses Rx to receive data from a device on the serial port. So I have an IObservable<char> that I slice and dice into various strings. However, the device vendor added some debugging information that is enclosed in braces:

interesting stuff {debug stuff} interesting stuff

I need to filter out (discard, ignore) the {debug stuff} from my character sequence?. Is there a simple way to do that? "When you see this character, ignore elements until you see this other character".

I looked at Until but that would terminate the sequence and I don't want that to happen...

0 投票
0 回答
1213 浏览

c# - 通过 FromAsyncPattern 使用 Observable 从 Stream 读取,如何正确关闭/取消

需要:具有 TCP 连接的长时间运行程序

AC# 4.0 (VS1010, XP) 程序需要使用 TCP 连接到主机,发送和接收字节,有时会正确关闭连接并稍后重新打开。周围的代码是使用 Rx.NetObservable风格编写的。数据量很低,但程序应该连续运行(通过妥善处理资源来避免内存泄漏)。

下面的文字很长,因为我解释了我搜索和发现的内容。它现在似乎起作用了。

总体问题是:由于 Rx 有时不直观,解决方案是否良好?那会可靠吗(比如说,它可以运行多年而不会出现问题)吗?

到目前为止的解决方案

发送

该程序获得NetworkStream这样的:

异步发送很容易。Rx.Net 允许使用比传统解决方案更短、更简洁的代码来处理这个问题。我用EventLoopScheduler. 需要发送的操作用 表示IObservable。使用ObserveOn(sendRecvThreadScheduler)保证所有发送操作都在该线程上完成。

到目前为止,这是出色且完美的。

收到

看起来,为了接收数据,Rx.Net 还应该允许比传统解决方案更短、更简洁的代码。在阅读了几个资源(例如http://www.introtorx.com/)和 stackoverflow 之后,似乎一个非常简单的解决方案是将异步编程桥接到 Rx.Net 就像在https://stackoverflow.com/a/14464068 /1429390

它主要工作。我可以发送和接收字节。

关闭时间

这是事情开始出错的时候。

有时我需要关闭流并保持清洁。基本上这意味着:停止读取,结束接收字节的 observable,打开一个新的连接。

一方面,当远程主机强制关闭连接时,BeginRead()/EndRead()立即循环消耗所有返回零字节的 CPU。我让更高级别的代码注意到这一点(在高级元素可用的上下文中)和清理(包括关闭和处理流)Subscribe()ReadObservable这也很有效,我负责处理Subscribe().

有时,我只需要关闭流。但显然这必须导致在异步读取中抛出异常。c# - 过早中止 BeginRead 和 BeginWrite 的正确方法?- 堆栈溢出

我添加了一个CancellationToken导致Observable.While()序列结束的。这对避免这些异常没有多大帮助,因为BeginRead()可以睡很长时间。

observable 中未处理的异常导致程序退出。搜索提供的.net - 出现异常后继续使用订阅 - 堆栈溢出建议添加一个Catch以有效地恢复损坏Observable的空的。

代码如下所示:

现在怎么办?问题

这似乎运作良好。检测到远程主机强行关闭连接或不再可达的情况,导致更高级别的代码关闭连接并重试。到目前为止,一切都很好。

我不确定事情是否完全正确。

一方面,该行:

感觉就像命令式代码中空 catch 块的不好做法。实际代码确实记录了异常,更高级别的代码检测到没有回复并正确处理,所以应该认为它还可以(见下文)?

此外,这确实比大多数传统解决方案要短。

解决方案是正确的还是我错过了一些更简单/更清洁的方法?

是否有一些可怕的问题对于响应式扩展的向导来说是显而易见的?

感谢您的关注。

0 投票
2 回答
341 浏览

c# - 如何在特定条件下使用滑动窗口触发 RX 信号

我有一个热的可观察传感器数据流。我需要一个可观察的信号,该信号仅在传感器值低于15 一段时间后触发。如果在任何时候该值超过 15,它应该重置滑动窗口。我已经使用下面的代码使其部分工作 - 但是如果值始终低于 15,它不会触发。

有什么建议么?

0 投票
1 回答
75 浏览

.net - 如何在 Timeout 扩展中获取最新的序列元素?

我想知道在超时触发之前获取序列中最新元素的最佳方法是什么?

我有一个不时 ping 远程服务的代码,我希望能够识别一个已脱机的服务。

使用超时扩展我想出了这个:

这有点工作,但它不允许我找到哪个服务已经消失。我想要的是 Timeout 扩展,其中包含流中的最新消息作为参数,以在它产生的错误消息中提供一些信息。

如何在 Timeout 扩展中获取最新的序列元素?

0 投票
2 回答
517 浏览

c# - 在 MVVM 中使用 Rx 跟踪集合更改

我有一个使用 MVVM 模式编写的跨平台项目(没有使用特定的框架,只是自己编写的实现)。项目有几个独立的模块,每个模块都有几个页面。每个页面都有 ViewModel 和某种负责面向数据的逻辑(获取、保存、删除、转换等)的管理器。所以数据流看起来是这样的:

虚拟机 -> 管理器 -> 服务 -> 管理器 -> 虚拟机

加载 VM 时,它会向管理器询问数据。Manager 执行服务调用,获取数据,从 DTO 构造模型集合,将此集合返回给 ViewModel,ViewModel 将模型集合转换为要在列表中呈现的 ViewModel 集合。

现在我正在寻找一种使用 Rx 实现这种逻辑的方法。大多数页面都有一个要编辑的主列表(插入、删除、修改的项目)和几个支持集合(某些组合框的提供者可以从中选择值)。支持集合可以通过标准的 async/await 调用或通过将任务转换为 Rx 轻松检索 - 它们没有问题。但可修改的列表是。在不破坏 Rx 逻辑的情况下,我无法弄清楚如何在页面的整个生命周期内跟踪此列表中的更改。我有订阅选项:

  • IEnumerable<Model>
  • Task<IEnumerable<Model>>
  • IObservable<IEnumerable<Model>>

但我想我必须订阅,IObservable<Model>因为我需要一种方法来跟踪个人更改。我需要一种从其他方法(如添加、删除或编辑)修改此集合的方法。那么我应该IObservable通过Observable.Create(或其他方法)创建并存储IObserver在 Manager 内的某个地方以调用OnNextOnError其他方法吗?但它看起来不像是 Rx 的做法。你对我的问题有什么建议吗?任何建议表示赞赏。谢谢。

PS:你可能会说 Rx 不是解决我跟踪可修改列表问题的最佳方法,因为它不是无穷无尽的事件流,我必须自己推送修改,但 Rx 有非常方便的过滤数据和处理错误的方法,所以我'我真的很期待在应用程序中实现它。

0 投票
2 回答
559 浏览

c# - 使用响应式套接字重复连接/断开连接时的内存泄漏

我正在使用我在 GitHub ( https://github.com/clariuslabs/reactivesockets ) 中找到的响应式套接字库,我想知道当客户端连接和断开连接时是否有人经历过内存泄漏。

我正在运行库提供的 ReactiveServer 示例,它基本上包括:

然后我创建了一个带有循环的控制台应用程序,并在这个循环中创建了一个 ReactiveClient,然后连接到服务器,等待 200 毫秒然后断开连接。客户端不向服务器发送任何数据。这是代码:

我在 Visual Studio 的内存分析器中看到的是,当我运行我的测试应用程序时,服务器的内存使用量呈线性增长。当测试应用停止时,服务器的内存使用量并没有下降;它保持在它达到的任何水平。

当我在没有实例化 StringChannel 类并订阅 Receiver 的代码的情况下运行 ReactiveServer 示例时,似乎没有发生内存泄漏(或者至少不太清楚,内存使用量呈线性增长)。代码将是这样的:

出于这个原因,我怀疑内存泄漏可能与protocol.Receiver.Subscribe客户端断开连接时未正确处理协议接收器的订阅(执行时)有关。阅读有关 Rx.NET 的更多详细信息,我已经阅读过,在正常情况下(如果可观察序列完成),则订阅会自动处置,但如果序列未完成,则不会处置。我怀疑当客户端断开连接然后订阅没有被处理时,序列没有完成。我试图在 Disconnected 事件处理程序中处理订阅,但我没有看到任何明显的改进。

非常感谢任何想法或建议。

谢谢。