问题标签 [alpakka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
67 浏览

scala - 弹性搜索在一个索引中删除多种类型

One of our application was designed such a way that we have 7 indices each per day (7 days worth), and each indices has mapping has mutliple type say userId, but the fields are all the same per type. It was designed long back (ES2.x) version. Initially the userId were just 2/3 so we never had a issue. But recently the application has been opened up for mutliple user ID (in thousands) and we have to remove multiple types(also will be moving to ES 6.x soon)

Can anyone please suggest what is the best way to go about it? We cannot have multiple indices based on userID (then we will end up with thousands of indices). Is there a way we can design our index such that filtering based on usedId is simple?

0 投票
0 回答
144 浏览

scala - 您可以使用 alpakka HDFS 连接器附加到文件吗?

我正在尝试使用此连接器从 Kafka 中提取消息并将它们写入 HDFS。只要文件不存在就可以正常工作,但如果存在则抛出 FileAlreadyExistsException。有没有办法使用这个连接器附加到一个已经存在的文件?我正在使用一个HdfsFlow.dataWithPassThrough流程,它需要一个HdfsWritingSettings, 但这只允许您设置一个overwrite布尔值;没有append选择。

0 投票
1 回答
350 浏览

sockets - Alpakka UDP:如何通过已经绑定的套接字响应接收到的数据报?

我正在使用 Alpakkas UDP.bindFlow将传入的 UDP 数据报转发到 Kafka 代理。发送这些数据报的旧应用程序需要来自消息发送到的同一端口的 UDP 响应。我正在努力为这种行为建模,因为它需要我将流的输出连接到它的输入。

我尝试了这个解决方案,但它不起作用,因为响应数据报是从不同的源端口发送的:

0 投票
0 回答
116 浏览

slick - 如何在 akka 流 alpakka slick 中控制获取大小

Slick 的文档警告说,对于某些数据库,必须设置提取大小。但是总的来说,我想知道如何通过 akka-stream-alpakka-slick 集成来控制 fetchSize?我们无法直接访问 DB Action,因此该怎么做?

0 投票
0 回答
304 浏览

scala - Slick 结果集大小默认限制

我想知道在使用光滑的 akka-streaming 集成执行 DBAction 时是否有默认的结果集大小。

确实(我正在使用akka-stream slick)。当我编写以下查询时:

如在

我的查询最多返回 400 条记录,然后就挂在那里。我在该表中有大约 5k 条记录。这正常吗?

但是,我已经能够通过使用以下语句来检索它们,尽管我相信它比它应该的要慢:

Slick.source(sql"""select * FROM DRUG Where ROWNUM <= 1000000000""".as[(Map[String,String])])

顺便说一下,我正在使用的数据库是 oracle。因此,我想知道是 oracle、slick 还是 akka-stream 集成导致了这种情况。

对此有什么建议吗?

0 投票
2 回答
369 浏览

amazon-s3 - Alpakka s3`multipartUpload`不上传文件

我有一个关于alpakka_kafka+alpakka_s3集成的问题。当我使用 alpakka kafka 源时, Alpakka s3multipartUpload似乎没有上传文件。

但是,只要我.take(100)在 kafkaSource 之后添加。一切正常。

任何帮助将不胜感激。提前致谢!

这是完整的代码片段:

0 投票
0 回答
144 浏览

scala - 主管未捕获 Alpakka JsonReader 子流中的异常

我有一个源,它使用 aflatMapConcat来合并通过解析 Json 生成的新元素流。为此,我使用了 Alpakka 提供的 JsonReader ( https://developer.lightbend.com/docs/alpakka/current/data-transformations/json.html )。这很好用,但我想处理将无效 Json 呈现给读者的情况。在这种情况下,我希望异常冒泡到主管,以便它可以重新启动流。

这是一个展示错误 json 案例的快速片段:

我希望 JsonReader 在抛出异常时失败,然后将异常冒泡到routeErrorHandler. 然而,当运行 JsonReader 阶段失败并且异常被吞下。我还尝试recover在流中放置一个以将异常抛出到“更高”级别(在子流之外),但这也不会被 routeErrorHandler 主管或放置在 ActorMaterializer 上的主管处理。

这是预期的行为吗?如果是这样,确保子流中的异常到达主管策略的正确方法是什么?

0 投票
1 回答
106 浏览

alpakka - alpakka cassandrasource 从 cassandra 连续读取数据

我们正在做一些 POC 以使用 Alpakka CassandraSource 连续读取 cassandra 表。以下是示例代码:

上面的代码从 emp1 表中获取行。由于该表不断增长,我们需要在数据可用时继续阅读。有什么方法可以在 CassandraSource 中设置连续读取?

0 投票
0 回答
639 浏览

akka-stream - Akka File Streaming 抛出错误为 akka.http.scaladsl.model.EntityStreamException:实体流截断

我们正在从 S3 流式传输文件并对其进行处理,在处理完成后,我们将文件作为错误/存档文件上传回 S3,而从 S3 流式传输文件则流式传输数据,并且在两者之间停止处理,错误为“akka.http.scaladsl.model. EntityStreamException:实体流截断”,不确定这取决于来自 S3 的文件大小流还是损坏的文件?

} }

0 投票
1 回答
246 浏览

apache-kafka - 使用 Alpakka 将 1 个输入连接到 n 个输出

我正在尝试将生产者连接到消费者的一些变体,在特殊情况下,有时我需要为每条消息生成 1 条额外消息(例如,1 条到输出主题,1 条消息到不同的主题),同时保持保证那。

我正在考虑做 mapConcat 并输出多个 ProducerRecord 对象,我担心边缘情况下的松散保证,即第一条消息足以在该偏移量上发生提交,从而导致第二条消息的潜在丢失。此外,您似乎不能只执行 .flatmap ,因为您将进入图形 API,这会变得更加混乱,因为一旦您合并到提交流程中,您不会忽略重复的偏移量变得更加困难.

原始的 1 对 1 文档在这里:https ://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-producer-and-consumer

有没有人想到/解决了这个问题?