问题标签 [akka-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 有没有像 SinkSource[T] 这样的东西?
我正在寻找SinkSource
提供 aSink
和 a 的 a Source
。如果元素流入该元素,Sink
则应在相应的Source
. 以下代码说明了我的意思:
如果执行这应该打印:12345
那么,是否SinkSource
存在(在 API 中没有看到它)或者有谁知道如何实现它?我应该提到我需要独特的访问权限Sink
,Source
因此这Flow
不是这种特定形式的解决方案:
scala - 如何发布或订阅物化的 Akka Stream 流程图?
我正在玩 Akka Stream,我试图在实现后弄清楚它的灵活性。
一种方法是使用低级反应流 API: http ://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/#akka.stream.scaladsl.PublisherSource
但是,您需要定义这些点以发布或订阅。有没有办法发布或订阅任意物化流图节点?这应该是可能的,因为物化流图只不过是参与者的集合。
例如:首先部署流程图1:A ~> B ~> C
然后,部署流程图 2 和 3:D ~> BB ~> E
scala - 'Graph must be connected' 与从 Publisher+Subscriber 演员创建的 Flow
根据Flow.apply(Sink, Source)
文档:
从看似断开的 Source 和 Sink 对创建流。
如果这是真的,为什么图仍然没有连接?
akka - Akka 流式传输 1.0-M4 更新后缺少类
我从更新
至
但是没有像 UndefinedSource、UndefinedSink、BlackholeSink、SubscriberSink、OnCompleteSink 等类。
没有文档说明为什么要删除它以及现在要采用哪种方法而不是这些类。
虽然在这个版本的文档中我仍然看到 UndefinedSource 的概念
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-io.html
我们使用 PartialFlowGraph 构建 Flow 的方式在 Constructing Sources, Sinks and Flows from Partial Graphs 中有详细解释,但是基本概念相当简单——我们可以在 Flow 中封装任意复杂的逻辑,只要它暴露相同的接口,这意味着只暴露一个 UndefinedSink 和一个 UndefinedSource,它们将被连接到 TCP 管道。在此示例中,我们使用 Concat 图处理阶段来注入初始消息,然后使用回显处理程序继续处理所有传入数据。您应该使用这种在 Flows 中封装复杂逻辑并将其附加到 StreamIO 的模式,以实现您的自定义和可能复杂的 TCP 服务器。
似乎是文档错误或现在需要一些额外的依赖项
jvm - 如何订阅在不同 JVM 上运行的反应式流实现?
假设我们有两个 Akka Stream 流,每个流都在自己的 JVM 上运行。
此示例在一个 JVM 上运行良好,但我如何订阅在不同 JVM 上运行的发布者?
我必须使用消息/队列中间件还是可以使用反应流 API 将两者连接在一起?
scala - 如何以编程方式从反应兔库中终止 io.scalac.amqp.Connection
我将akka流与reactive-rabbit库结合使用来构建一个脚本,将一些信息推送到我本地rabbitmq服务器上的交换中。
一旦信息被推送到队列中,我希望程序自行关闭。然而,Connection
保持程序活着,我找不到任何方法Connection
或其他如何杀死它的例子。不可避免地,我必须手动终止该进程。
我的代码看起来像这样:
这是我的 build.sbt 库依赖项中的一个片段:
这些一次性任务是否有更好的模式 - 例如您将回调传递给的临时连接?我见过的示例中的所有用例都是针对长时间运行的客户端,这些客户端一直运行到用户明确杀死它们为止。
提前致谢!
scala - 如何动态向 Source 添加元素?
我有示例代码来生成未绑定的源并使用它:
对象主要{
}
我想创建实现的类:
我需要将它与多个线程一起使用,例如:
和预期的完整代码:
如何实施MySources
?
scala - 将回调方法实现转换为 akka 流 Source
我正在与我无法控制的 java 库中的数据发布者合作。发布者库使用典型的回调设置;库代码中的某处(库是 java,但为了简洁起见,我将在 scala 中描述):
库的用户需要编写一个实现该onData
方法的类并将其传递给 a DataProducer
,库代码如下所示:
它DataProducer
有自己无法控制的内部线程,以及伴随的数据缓冲区,onData
每当有另一个DataType
对象要消耗时就会调用它。
所以,我的问题是:我如何编写一个层来将原始库模式转换/翻译成 akka 流Source对象?
先感谢您。
scala - 有没有办法使用 Akka-Stream 获得可预测的演员命名?
我正在使用 aActorPublisher
作为 Akka-Stream Source
。我不知道如何以可预测的方式命名输入参与者,以便我可以从我的应用程序的其他部分向它发送消息。我正在像这样实例化我的源:
当我实现流时,我得到一个ActorRef
,但它的路径是动态生成的,它只使用我提供的名称作为代码生成的以流为中心的命名方案的一部分。
有没有办法让这个前端演员源有一个明确的名字,还是我被困在传递 ActorRef ?
如果我不能明确命名它,这是否意味着您不能直接使用 Akka-Stream 进行远程处理?
编辑:我现在可以使用相对路径找到我的演员,但我仍然需要弄清楚如何命名我的Flow
,这样我才能了解相关演员的完整路径是什么。
编辑:(下面的 akka 版本信息,scala 2.11.6)
编辑: akka-user google 小组的友好人士启发了我,并建议处理这个问题的正确方法是传递调用本身的ActorRef
结果而不是使用. 如果我发现这种情况在未来发生变化,我会更新这个问题。谢谢阅读。runWith()
.actorSelection()
scala - 使用 Slick 3.0 Databasepublisher 的 Akka-http 流式传输
我正在使用 slick 3.0 并有一个 databasepublisher 对象作为
我正在使用 akka-http 作为休息层,如下所示,
我如何使用这个 databasepublisher 对象来转换(json)并将每一行流式传输到客户端。请帮忙。