问题标签 [akka.net-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
c# - 从演员到演员发送流(例如文件)
我想将数据从一个演员的流发送到另一个演员的流。最后,这应该与远程参与者一起工作。使用Akka.NET Streams这应该是一件容易的事,但也许我误解了它。
这是SenderActor的一部分:
注意:this.receiver
是IActorRef
数据应该发送到的地方。
ReceiverActor 现在在流结束后获取所有ByteString
消息。Messages.StreamCompleted
这怎么能很容易地放在 ReceiverActor 中呢?在最好的情况下Stream
再次。
在ReceiverActor中,我尝试将所有ByteString
消息发送到Source
应该填写 a 的 a MemoryStream
:
但是它MemoryStream
是异步填充的,当streamReceiver
终止时它会关闭流,所以我无法获取数据。
如何正确检索流?
更新我在本地工作:
感谢来自 Akka.NET 的 gitter 频道中 Horusiath 的输入,我能够直接接收ByteStrings
,而不必在那里使用 Akka.Stream 。
和:
请注意,Akka.NET 1.3 会将方法WriteTo(Stream)
和添加WriteToAsync(Stream, CancellationToken)
到ByteString
.
这仍然不适用于远程参与者,因为接收参与者系统收到此错误(序列化程序是 Hyperion):
错误 [没有为此对象定义无参数构造函数。]
实际上ByteString
有一个无参数的构造函数,但它是protected
.
我认为它ByteString
不可序列化?
akka - Akka.NET 流
您将如何每十秒重播一次 Web 服务请求,持续十次,直到它得到响应?
我试过RecoverWithRetries
and InitialDelay
,但第一次恢复会立即重播 Web 服务调用:
第一次重试立即发生,而不是十秒后。在 Akka,有一个RestartSource
类;我们在 Akka.NET 中没有它。有任何想法吗?
actor - 在 akka.net 中使用 kinesis 进行消息传递是个好主意吗
我们目前正在 Akka.NET 之上使用 DDD 原则构建一个演员系统。
在如何使我们的服务具有弹性方面,我们有几个缺失点:
- 默认情况下,Actor 之间的 At-Most-Once-Delivery
- 演员邮箱的弹性
- FSMactor 正在存储无法立即处理的传入消息 - 弹性?
- Pub/Sub 模式(和弹性)
如果某些消息丢失,我们不确定该怎么办,因此我们无法转换到下一个状态以完成涉及多个参与者的请求。
我的想法是使用像 kinesis 这样的事件流系统来传递消息。然后,我们到处都有弹性,只需要知道我们处理了流中的哪个事件。我还缺少其他东西吗?你认为这是个好主意吗?这是否违反了一些最佳实践?
c# - 为什么我的 Akka.NET 流订阅者收不到消息?
我正在尝试编写一个简单的 Akka.NET 流。源是一个IActorRef
. 水槽是一个ISubscriber
. 我正在使用 TestKit 将其实现为单元测试:
对该方法的初始Verify
调用OnSubscribe
顺利通过,但模拟订阅者从未收到对OnNext
.
我究竟做错了什么?
运行为netcoreapp2.0
. 参考:
c# - Akka.net 期刊阅读器丢失事件
在我们的应用程序中,我们使用带有事件溯源的 Akka.net。持久参与者将他们的事件保存在 SQL Server 数据库中。我们还有视图参与者,它们订阅这些事件,使用日志阅读器/持久性查询来创建物化视图。我们在数据库中有一个表,每个视图参与者都有一行。此行包含视图参与者的名称和最后处理的事件的偏移量。乍一看,这工作顺利。然而,有时,当我们运行导致数千个事件的测试时,期刊阅读器会丢失一些事件。
View Actor 是一个 ReceiveActor。启动时,它从数据库中检索最后处理的事件偏移量(从参与者的构造函数中调用)。偏移量在 OffsetMessage 中通过管道传递给 self。在接收到 OffsetMessage 时,视图 Actor 会初始化日志阅读器。在接收事件时(在 EventEnvelope 消息中),视图会更新。
从日志阅读器运行的操作首先将一行写入日志。该行包含事件偏移量。EventEnvelope 接收处理程序还将一行写入日志。该行还包含事件偏移量。
我们有一个测试导致 9635 事件插入到日志中。有时,日志阅读器和 EventEnvelope 接收处理程序记录的事件少于 9635 个。他们都记录了相同的数字,所以期刊读者似乎错过了这些事件。日志中丢失的事件对应于视图中的丢失项。我们在一个空数据库上运行测试。日志记录处于调试级别,不显示异常。丢失的事件(我们已经看到了 1 到 4 的数字)可以是第一个、中间或最后一个事件。每次这都不一样。
到目前为止,我们不知道是什么导致了这个问题,或者如何解决它。
以下是我们的代码片段。视图 Actor 都继承自一个基类:ViewActorBase。
我们的代码或架构有问题吗?有更好的解决方案吗?
附加信息 我们已经使用 SQL Server 探查器运行了一些测试来监视对数据库的查询。
对事件日志执行查询,要求 100 个事件,从偏移量 204743 开始。结果包含 61 行。
我们将下一个查询扩展为从 204804 (204743 + 61) 开始。但是,它从 204810 开始。为什么它会跳过(或丢失)6 个事件?
akka.net-streams - 如何将项目排入 akka.net 队列源
在 akka.net 流中创建队列源后,如何将项目添加到队列中?创建的对象中没有 Enqueue、Add 或 Offer 方法。
c# - 创建异步流源
我有一个昂贵的方法来调用创建一批源项目:
我只想在没有要处理的项目(或低于某个阈值)时填充新项目。到目前为止,我无法弄清楚要使用哪种Source方法。
我已经实现了一个会不断返回新项目的原始流:
我预计Source.Task
会起作用,但看起来它只调用一次。如何为这种情况创建源?
c# - Akka.Streams.Amqp.IncomingMessage.Envelope.RoutingKey 值不正确
我正在尝试RabbitMQ
通过Akka.streams.Ampq
源阅读传入的消息,但RoutingKey
不正确。
另一个令人担忧的问题是信封不包含交换名称。
实际行为:我得到了
预期的行为:我期望得到QueueName
哪个是我的班级,例如。"Tax_Queue"
QueueName
MyTestClass
RoutingKey
f# - 如何在 F# 中使用 Akka.Streams.*.ConcatMany?
我想创建一个流,从传入的元素中创建一个新的源(它将是一个持久性查询),然后将结果展平。像这个简化的例子:
此代码按预期编译和工作。我的问题是,当我将它翻译成 F# 时:
我收到一条错误消息
我认为其原因是 F# 处理协/逆变的方式与 C# 不同,并且不能简单地转换这些通用专业化(https://github.com/fsharp/fslang-suggestions/issues/162),但我无法弄清楚在 anint
和 a之间进行转换的方法SourceShape<int>
。是否可以将此示例转换为 F#?
c# - Akka.Net:网络上的反应式流
我试图在AkkaStreamsModel 中分别实现 Source 作为源和 Sink 作为远程,然后将 Sink 作为源和源作为远程,Type 1 和 Type 2 工作流。
类型 1 创建 aSourceRef
并将其作为传入数据的公开源传递给远程 Actor 到接收器 Actor。类型 2 创建 aSinkRef
并将其作为暴露的接收器传递给远程参与者以开始向其发送数据。为了进一步解释,我已将 Type 1 ( AkkaStreams ) 和 Type 2 ( AkkaStreamsFaulty ) 工作流上传到 GitHub。
类型 1 工作流已设置并正常工作,但是,类型 2 工作流有故障。我已将问题缩小到演员Receive<PrepareUpload>()
内部的方法。DataReceiver
在成功创建接收器后,使用该方法将流和接收器(通过MeasurementsSinkReady
消息)发送到源参与者PipeTo()
。
谁能指出我在 Type 2 工作流程中可能出错的地方?我已经用尽了所有其他选项(Akka.Net 文档;Akka.Net Gitter,各种博客和视频等)。我不确定从这里去哪里,任何帮助将不胜感激。
谢谢你。
编辑(根据Bartosz Sypytkowski 的评论添加了额外信息)
数据永远不会在两个参与者之间远程发送,这就是问题所在。使用类型 1,aSourceRef
被建立并发送到远程参与者。远程参与者收到此消息。然而,对于类型 2,它SinkRef
被创建但永远不会到达远程参与者。因此,我认为问题出在SinkRef
orPipeTo()
方法的生成上。这是因为如果您在参与者之间发送消息,它可以工作,但如果消息以SinkRef
正确设置为条件,那么设置中出现了问题。