问题标签 [akka.net-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
902 浏览

c# - 从演员到演员发送流(例如文件)

我想将数据从一个演员的流发送到另一个演员的流。最后,这应该与远程参与者一起工作。使用Akka.NET Streams这应该是一件容易的事,但也许我误解了它。

这是SenderActor的一部分:

注意:this.receiverIActorRef数据应该发送到的地方。

ReceiverActor 现在在流结束后获取所有ByteString消息。Messages.StreamCompleted

这怎么能很容易地放在 ReceiverActor 中呢?在最好的情况下Stream再次。

ReceiverActor中,我尝试将所有ByteString消息发送到Source应该填写 a 的 a MemoryStream

但是它MemoryStream是异步填充的,当streamReceiver终止时它会关闭流,所以我无法获取数据。

如何正确检索流?


更新我在本地工作:

感谢来自 Akka.NET 的 gitter 频道中 Horusiath 的输入,我能够直接接收ByteStrings,而不必在那里使用 Akka.Stream 。

和:

请注意,Akka.NET 1.3 会将方法WriteTo(Stream)和添加WriteToAsync(Stream, CancellationToken)ByteString.

这仍然不适用于远程参与者,因为接收参与者系统收到此错误(序列化程序是 Hyperion):

错误 [没有为此对象定义无参数构造函数。]

实际上ByteString有一个无参数的构造函数,但它是protected.

我认为它ByteString不可序列化?

0 投票
1 回答
205 浏览

akka - Akka.NET 流

您将如何每十秒重播一次 Web 服务请求,持续十次,直到它得到响应?

我试过RecoverWithRetriesand InitialDelay,但第一次恢复会立即重播 Web 服务调用:

第一次重试立即发生,而不是十秒后。在 Akka,有一个RestartSource类;我们在 Akka.NET 中没有它。有任何想法吗?

0 投票
1 回答
144 浏览

actor - 在 akka.net 中使用 kinesis 进行消息传递是个好主意吗

我们目前正在 Akka.NET 之上使用 DDD 原则构建一个演员系统。

在如何使我们的服务具有弹性方面,我们有几个缺失点:

  • 默认情况下,Actor 之间的 At-Most-Once-Delivery
  • 演员邮箱的弹性
  • FSMactor 正在存储无法立即处理的传入消息 - 弹性?
  • Pub/Sub 模式(和弹性)

如果某些消息丢失,我们不确定该怎么办,因此我们无法转换到下一个状态以完成涉及多个参与者的请求。

我的想法是使用像 kinesis 这样的事件流系统来传递消息。然后,我们到处都有弹性,只需要知道我们处理了流中的哪个事件。我还缺少其他东西吗?你认为这是个好主意吗?这是否违反了一些最佳实践?

0 投票
1 回答
364 浏览

c# - 为什么我的 Akka.NET 流订阅者收不到消息?

我正在尝试编写一个简单的 Akka.NET 流。源是一个IActorRef. 水槽是一个ISubscriber. 我正在使用 TestKit 将其实现为单元测试:

对该方法的初始Verify调用OnSubscribe顺利通过,但模拟订阅者从未收到对OnNext.

我究竟做错了什么?

运行为netcoreapp2.0. 参考:

0 投票
0 回答
224 浏览

c# - Akka.net 期刊阅读器丢失事件

在我们的应用程序中,我们使用带有事件溯源的 Akka.net。持久参与者将他们的事件保存在 SQL Server 数据库中。我们还有视图参与者,它们订阅这些事件,使用日志阅读器/持久性查询来创建物化视图。我们在数据库中有一个表,每个视图参与者都有一行。此行包含视图参与者的名称和最后处理的事件的偏移量。乍一看,这工作顺利。然而,有时,当我们运行导致数千个事件的测试时,期刊阅读器会丢失一些事件。

View Actor 是一个 ReceiveActor。启动时,它从数据库中检索最后处理的事件偏移量(从参与者的构造函数中调用)。偏移量在 OffsetMessage 中通过管道传递给 self。在接收到 OffsetMessage 时,视图 Actor 会初始化日志阅读器。在接收事件时(在 EventEnvelope 消息中),视图会更新。

从日志阅读器运行的操作首先将一行写入日志。该行包含事件偏移量。EventEnvelope 接收处理程序还将一行写入日志。该行还包含事件偏移量。

我们有一个测试导致 9635 事件插入到日志中。有时,日志阅读器和 EventEnvelope 接收处理程序记录的事件少于 9635 个。他们都记录了相同的数字,所以期刊读者似乎错过了这些事件。日志中丢失的事件对应于视图中的丢失项。我们在一个空数据库上运行测试。日志记录处于调试级别,不显示异常。丢失的事件(我们已经看到了 1 到 4 的数字)可以是第一个、中间或最后一个事件。每次这都不一样。

到目前为止,我们不知道是什么导致了这个问题,或者如何解决它。

以下是我们的代码片段。视图 Actor 都继承自一个基类:ViewActorBase。

我们的代码或架构有问题吗?有更好的解决方案吗?

附加信息 我们已经使用 SQL Server 探查器运行了一些测试来监视对数据库的查询。

对事件日志执行查询,要求 100 个事件,从偏移量 204743 开始。结果包含 61 行。

我们将下一个查询扩展为从 204804 (204743 + 61) 开始。但是,它从 204810 开始。为什么它会跳过(或丢失)6 个事件?

0 投票
1 回答
137 浏览

akka.net-streams - 如何将项目排入 akka.net 队列源

在 akka.net 流中创建队列源后,如何将项目添加到队列中?创建的对象中没有 Enqueue、Add 或 Offer 方法。

0 投票
1 回答
93 浏览

c# - 创建异步流源

我有一个昂贵的方法来调用创建一批源项目:

我只想在没有要处理的项目(或低于某个阈值)时填充新项目。到目前为止,我无法弄清楚要使用哪种Source方法。

我已经实现了一个会不断返回新项目的原始流:

我预计Source.Task会起作用,但看起来它只调用一次。如何为这种情况创建源?

0 投票
1 回答
85 浏览

c# - Akka.Streams.Amqp.IncomingMessage.Envelope.RoutingKey 值不正确

我正在尝试RabbitMQ通过Akka.streams.Ampq源阅读传入的消息,但RoutingKey不正确。

另一个令人担忧的问题是信封不包含交换名称。

实际行为:我得到了 预期的行为:我期望得到QueueName 哪个是我的班级,例如。"Tax_Queue"QueueNameMyTestClass RoutingKey

0 投票
2 回答
133 浏览

f# - 如何在 F# 中使用 Akka.Streams.*.ConcatMany?

我想创建一个流,从传入的元素中创建一个新的源(它将是一个持久性查询),然后将结果展平。像这个简化的例子:

此代码按预期编译和工作。我的问题是,当我将它翻译成 F# 时:

我收到一条错误消息

我认为其原因是 F# 处理协/逆变的方式与 C# 不同,并且不能简单地转换这些通用专业化(https://github.com/fsharp/fslang-suggestions/issues/162),但我无法弄清楚在 anint和 a之间进行转换的方法SourceShape<int>。是否可以将此示例转换为 F#?

0 投票
1 回答
317 浏览

c# - Akka.Net:网络上的反应式流

我试图在AkkaStreamsModel 中分别实现 Source 作为源和 Sink 作为远程,然后将 Sink 作为源和源作为远程,Type 1 和 Type 2 工作流。

类型 1 创建 aSourceRef并将其作为传入数据的公开源传递给远程 Actor 到接收器 Actor。类型 2 创建 aSinkRef并将其作为暴露的接收器传递给远程参与者以开始向其发送数据。为了进一步解释,我已将 Type 1 ( AkkaStreams ) 和 Type 2 ( AkkaStreamsFaulty ) 工作流上传到 GitHub。

类型 1 工作流已设置并正常工作,但是,类型 2 工作流有故障。我已将问题缩小到演员Receive<PrepareUpload>()内部的方法。DataReceiver在成功创建接收器后,使用该方法将流和接收器(通过MeasurementsSinkReady消息)发送到源参与者PipeTo()

谁能指出我在 Type 2 工作流程中可能出错的地方?我已经用尽了所有其他选项(Akka.Net 文档Akka.Net Gitter,各种博客和视频等)。我不确定从这里去哪里,任何帮助将不胜感激。

谢谢你。

编辑(根据Bartosz Sypytkowski 的评论添加了额外信息)

数据永远不会在两个参与者之间远程发送,这就是问题所在。使用类型 1,aSourceRef被建立并发送到远程参与者。远程参与者收到此消息。然而,对于类型 2,它SinkRef被创建但永远不会到达远程参与者。因此,我认为问题出在SinkReforPipeTo()方法的生成上。这是因为如果您在参与者之间发送消息,它可以工作,但如果消息以SinkRef正确设置为条件,那么设置中出现了问题。