问题标签 [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1028 浏览

scala - 并非所有的 akka 流接收器都接收到发出的数据

运行以下 akka 流式 FlowGraph 时,并非所有发出的字符都被所有接收器接收。

运行后生成以下输出

第二个接收器没有收到第 8 个、第 9 个和第 10 个值:IJK,但整个流程仍然结束。

我应该怎么做才能等待两个接收器消耗所有数据?我发现如果我更改(x1,x2)=>x1(x1,x2)=>x2this 将等待。这与在第一个接收器中休眠 300 毫秒相同。

0 投票
0 回答
698 浏览

scala - Converting Play Enumerator to reactive stream Publisher

I can get enumerator from mongodb, using reactivemongo like,

#xA;

How to stream from this enumerator, using akka-http and akka-stream ?

I have converted the enumerator to publisher using, play streams experimental library,

#xA;

and stream from, akka-http like,

#xA;

But, the stream does not ending, the client is still waiting for the data. Is there any issue while creating enumerator ?. Or the issue in converting the Enumerator to publisher ?

I have already posted the question in reactivemongo google user group and play user group https://groups.google.com/forum/#!topic/reactivemongo/uObGXFQFH0Y

0 投票
1 回答
1779 浏览

akka - 如何将多个演员作为源附加到 Akka 流?

我正在尝试构建和运行一个 akka 流(在 Java DSL 中),其中有 2 个演员作为源,然后是一个合并结点,然后是 1 个接收器:

我的问题是如何获取对源演员的 ActorRef 引用以便向他们发送消息?如果有 1 个演员,我不会使用图形生成器,然后 .run() 或 runWith() 方法将返回 ActorRef 对象。但是如果有很多源演员怎么办?甚至有可能实现这样的流程吗?

0 投票
1 回答
606 浏览

java - 如何使用 Akka Streams for java 创建和使用 Pairs 流?

我正在尝试使用Akka 流的 javadsl 变体,当我尝试定义应该采用Pair元素的 Flow 时,我似乎遇到了问题。

比如说,我定义了一个 Flow,它接受传入的字符串并将其与一个布尔值配对,说明字符串的长度是否为 5 或更大:

这工作正常。但下一步是我定义一个流,它接受一对字符串和布尔值并再次返回字符串:

我不确定是否应该使用 .of(Pair.class) 方法创建 pairToString 流,因为我不知道是否应该输入传入的 Pair 以及在哪里输入(例如Pair<String, Boolean>

非常感谢任何帮助和/或指针!

0 投票
1 回答
1441 浏览

akka-stream - 如何创建具有不同输入和输出类型的流以在图形内部使用?

我正在通过在内部构建图表来制作自定义水槽。这是我的代码的广泛简化,以证明我的问题:

我遇到的问题是,虽然创建具有相同输入/输出类型且只有一个类型参数且没有值参数的流是有效的,例如:(Flow[Int]在整个文档中),但仅提供两个是无效的类型参数和零值参数。

根据Flow 对象的参考文档,apply正在寻找的方法定义为

并说

通过将 FlowGraph.Builder 传递给给定的创建函数来创建流。

create 函数应返回一对 Inlet 和 Outlet,它们对应于创建的 Flows 输入和输出端口。

当我尝试制作我认为非常简单的流程时,似乎我需要处理另一个级别的图形构建器。有没有一种更简单、更简洁的方法来创建一个 Flow 来改变它的输入和输出的类型,而不需要弄乱它的内部端口?如果这是解决此问题的正确方法,那么解决方案会是什么样子?

奖励:为什么很容易制作一个不改变其输出类型的流?

0 投票
1 回答
301 浏览

stream - 有没有办法用 Akka 流将 Path 元素流展平为内容行流?

我正在尝试创建一个处理 Path 元素的 javadsl Flow。它应该发出与 Path 元素关联的文件的内容行。换句话说,我认为我需要将 Path 元素流展平为 String 元素流。

Flow 确实有一个flatten方法,但它涉及使用FlattenStrategy,我不确定如何在我的案例中使用它。

任何帮助是极大的赞赏!

Edit1:所以我的理解是,使用 StreamReader 读取文件并在阅读器到达“\n”时发出一个新字符串可能是个好主意。所以现在的问题是如何从一个单一的转换方法发出多个元素。像这样

这可能吗?

0 投票
1 回答
2212 浏览

scala - 端到端反应式流式处理 RESTful 服务(又名 Back-Pressure over HTTP)

我一直试图在网上澄清这个问题一段时间没有成功,所以我会尝试在这里问它。

我想找到一些资源或示例来展示我如何构建端到端完全背压的 REST 服务 + 客户端。我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他),我将拥有(并且能够“可视化”)在整个过程中处理的背压REST 服务器构建,例如使用 Akka-Http。

需要明确的是,我正在寻找类似以下谈话的内容(但我找不到幻灯片或视频来确认它): http: //oredev.org/2014/sessions/reactive-streaming-restful-applications-with-akka -http

我对我看到的大多数示例的怀疑是关于我可以找到很多 REST 服务(服务器)在后端使用 Akka Http 和 Akka 流的情况,但我不确定背压是否通过 HTTP “通信”和 REST,如果客户端正在实现 Reactive Streams。在这种情况下,我会通过 TCP/HTTP 桥接一个“流”还是只有 2 个独立的流?这是我主要的怀疑和困惑。

希望我足够清楚,有人能够对此事有所了解。
无论如何,谢谢!

0 投票
1 回答
60 浏览

akka - 是否可以在 javadsl 中为物化 Akka 源引入更多值?

我正在尝试使用 Akka 的 javadsl 设置一个流,该流处理文件夹中新文件的处理。我的问题是:

每次引入新文件时我是否应该重新运行 RunnableFlow,

或者是否可以在流等待将新文件引入所述流的源时保持 RunnableFlow 无限期运行?

我现在的来源:

Queue 是一个简单的可迭代队列

0 投票
2 回答
2187 浏览

akka - 在 Akka-Streams 中拆分流

我正在尝试提出一种解决方案,将我收到的传入字符串拆分为多个字符串。我一直在研究,看起来在以前版本的 Akka-Streams 中有一个类Transformer可以扩展来进行这种转换。

在我使用的版本(RC2)中有Stages 但我不确定如何实现拆分模式。

我正在寻找XXXXX允许我输入 aString并返回一个序列String并将每个序列发送到流的其余部分的组件。

0 投票
2 回答
1330 浏览

akka - 将基于 ack 的 Actor 与 akka-stream 集成

我有一个被设计用于 akka-io acking 的 Actor,这样它在向上游(到网络)发送消息时会等待 Ack。这个actor是后端异步应用程序的接口。

我想要一个包装层,它允许我将此 Actor 转换为 akka-streams Flow[Incoming, Outgoing, ???],以便它可以与期望这种签名的较新库集成。

(来自上游的消息很少,所以我们不太关心那里的背压,但拥有它并不是一件坏事。)

我从 akka-user 邮件列表中得到了很好的授权,在 akka-streams 中没有将 Actor 与流集成的代码,并且为了将 Actor 插入 Stream 并保留基于 Ack 的背压,必须实现 推拉阶段

看来我们实际上需要两个PushPullStages ......一个 forupstream => SimpleActor和一个 for SimpleActor => upstream

我的问题是:

  1. 是否有任何库提供诸如演员和流之间的集成?
  2. PushPullStage有没有比从头开始实现双向更简单的方法?
  3. 是否有任何现有的测试框架可以允许对这样的实现进行压力测试?