问题标签 [alpakka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
151 浏览

apache-kafka - Alpakka Kafka 无法定义批处理

我正在尝试测试驱动 Alpakka Kafka 连接器至少一次批处理示例,我收到以下编译时错误

ConsumerMessage 类型未定义此处适用的 createCommittableOffsetBatch(ConsumerMessage.CommittableOffset)

并且 ConsumerMessage.CommittableOffsetBatch 类型未定义适用于此处的 updated(S, ConsumerMessage.CommittableOffset)

0 投票
3 回答
1331 浏览

akka-http - 如何使用 Akka HTTP 或 Alpakka 在 unix 域套接字上访问 REST API?

我想使用 /var/lib/docker.sock unix 域套接字访问 docker API。我已经看到可以使用(现代版本的)curl 调用 API 的示例,如下所示:

其中 REST 命令在 /containers/json 路径中表示。我很高兴看到 Alpakka Unix 域套接字适配器,但您似乎只能发送和接收原始字节。有什么优雅的方法可以做到这一点吗?还是我必须手动构建 HTTP 标头并手动管理所有困难的东西?

0 投票
1 回答
87 浏览

scala - 标签之间的 Alpakka XML 内容

Alpakka XML 处理流程允许逐个元素地读取 xml 文件。但是如何在特定数据StartElementEndElement包含数据之间提取StartElement数据?subslice不是一个选项,因为所需元素没有常量前缀。

0 投票
1 回答
274 浏览

scala - 如何将消息发布回 RabbitMQ,以便其他消费者可以使用?

我正在使用 Alpakka AMQP 库 ( https://developer.lightbend.com/docs/alpakka/current/amqp.html ) 来处理反应流中的 RabbitMQ 消息并将它们转储到 Kafka 中。

我们正在使用 Avro 和 Schema Registry,因此格式错误的消息无法通过验证,需要根据具体情况进行处理。我们想要的行为是应用程序死掉,并在 30 秒 - 1 分钟后重新启动,在此期间,我们能够使用 UI 将消息从队列中拉出以查看它有什么问题。

出于某种原因,我nack()似乎没有将消息释放回来,并且它保持在Unacked状态并且在释放之前无法查看。如何做到这一点?我将包含我的代码片段:

nack()是 a java.util.Future,所以只是为了测试我Await在它周围扔了一个以确保问题不是我在信号可以将它传递给 Rabbit 之前抛出错误。

0 投票
1 回答
138 浏览

java - 使用 Alpakka 使用来自 IBM JMS 队列的消息

IBM 队列 (insert_queue) 中有 100 条消息。我想消费 10 并存储在一个对象中并处理 10 个消费消息并等待一段时间,然后确认 10 个消费消息。

我的示例代码:

这不起作用

0 投票
1 回答
149 浏览

scala - SSLHandshakeException 在通过 Alpakka 将文件上传到 AWS S3 期间发生

我正在尝试设置一个 Alpakka S3 用于文件上传。这是我的配置:

alpakka s3 依赖:

这里是application.conf

文件上传代码示例:

当我运行此代码时,我得到:

javax.net.ssl.SSLHandshakeException: General SSLEngine problem

可能是什么原因?

0 投票
2 回答
1820 浏览

scala - How does slick profile work with respect to slick DB

I am having trouble to understand how to use slick profile.

My problem:

I am trying to use Slick with Akka-stream via the Alpakka JDBC plugin. The example given online is as follows:

The issue is that it works with

implicit val session = SlickSession.forConfig("slick-h2")

I try to use slick session as follows:

Where my pp config is as such:

This code breaks at runtime.

However in another code, where I do not use Akka-Stream and therefore do not use slickSession as such

the code works perfectly.

I concluded that this has to do with Database.forconfig("pp") and SlickSession.forConfig("slick-h2") that require 2 different things.

There is an explanation about profile in Slick website but that is not very easy to understand and provides little instruction. It does not list the available profile and their syntax.

Hence my question is, what's the difference between the two forConfig. How the profile works, where are they needed. Why the two configuration files for the database are not dealt with the same way.

Finally and formost, what is the profile for oracle. In slick 3.2.3 Slick is now free. I could not find the profile for it as in profile = "slick.jdbc.H2Profile$"

Can someone help clarify the configuration file difference, what is expected, what the profile does, and what is the profile for Oracle ?

0 投票
1 回答
881 浏览

scala - Alpakka S3 连接器流不会处理负载,抛出 akka.stream.BufferOverflowException

我有一个 akka-http 服务,我正在尝试使用alpakka s3 连接器来上传文件。以前我使用的是临时文件,然后使用 Amazon SDK 上传。这种方法需要对 Amazon SDK 进行一些调整以使其更像 scala,但它甚至可以同时处理 1000 个请求。吞吐量并不惊人,但所有请求最终都通过了。这是更改前的代码,没有 alpakka:

```

```

当我将其更改为使用 alpakka 连接器时,代码看起来好多了,因为我们可以将ByteSourcealpakka 和 alpakka连接Sink在一起。然而,这种方法无法处理如此大的负载。当我一次执行 1000 个请求(10 kb 文件)时,只有不到 10% 的请求通过,其余的则失败并出现异常:

akka.stream.alpakka.s3.impl.FailedUpload:超过配置的最大打开请求值 [32]。这意味着这个池的请求队列(HostConnectionPoolSetup(bargain-test.s3-eu-west-3.amazonaws.com,443,ConnectionPoolSetup(ConnectionPoolSettings(4,0,5,32,1,30 seconds,ClientConnectionSettings(Some (User-Agent: akka-http/10.1.3),10秒,1分钟,512,None,WebSocketSettings(,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$4787/1279590204@ 4d809f4c),List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,Map(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0 , 用户代理 -> 32),false,true,akka.util。ConstantFun$$$Lambda$4534/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda$4534/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda$4535/297570074@6b426c59),无,TCPTransport),新,1 second),akka.http.scaladsl.HttpsConnectionContext@7e0f3726,akka.event.MarkerLoggingAdapter@74f3a78b))) 已完全填满,因为池当前处理请求的速度不足以处理传入的请求负载。请稍后重试请求。看 请稍后重试请求。看 请稍后重试请求。看 http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html 了解更多信息。

以下是 Gatling 测试的摘要:

---- 响应时间分布 ---------------------------------------- t < 800毫秒 0 ( 0%)

800 毫秒 < t < 1200 毫秒 0 ( 0%)

t > 1200 毫秒 90 ( 9%)

失败 910 ( 91%)


当我同时执行 100 个请求时,有一半会失败。因此,仍然接近令人满意。

这是一个新代码:```

```

可以在此处查看具有两个端点的完整代码

我有一些问题。

1)这是一个功能吗?这就是我们所说的背压吗?

2)如果我希望这段代码表现得像带有临时文件的旧方法(没有失败的请求并且所有请求都在某个时候完成),我该怎么办?我试图为流实现一个队列(链接到下面的源代码),但这没有任何区别。代码可以在这里看到。

(* 免责声明 * 我仍然是 scala 新手,试图快速理解 akka 流并找到解决该问题的方法。这段代码很可能存在一些简单的错误。* 免责声明 *)

0 投票
0 回答
418 浏览

akka - Alpakka - Akka-Http failure while downloading file with latest S3 Connector

we are using alpakka latest S3 connector for downloading stream but it fails with max-length error as exceeded content length limit (8388608 bytes)! You can configure this by setting `akka.http.[server|client].parsing.max-content-length` or calling `HttpEntity.withSizeLimit` before materializing the dataBytes stream. ,stream as Source (downloading file from S3) and flow which calls to AMQP server using java amqp-client library , when we stream file less that 8M it process but larger file did not process and throw error highlighted above.

  • Akka-http does not keep file in memory it streams directly from source , do we need to first keep file in memory and then stream it?

  • Is Downstream i.e AMQP which is java client libary(5.3.0) is givig
    this issue , we are using One connection and one channel for Rabbit
    MQ?

    /li>
0 投票
1 回答
452 浏览

apache-kafka - Alpakka/Kafka - 分区消耗速度比其他分区快

我一直在使用 alpakka kafka 从 kafka 主题流式传输数据。我在用着:

最近,我尝试在一个有 15 个分区的主题上向更多的消费者(如 3)发送垃圾邮件。当我插入更多具有相同组 id 的消费者时,它友好地为每个消费者拆分 5 个分区,但它似乎不会同时消耗所有分区,似乎是一个接一个地读取,或者读取特定分区比其他分区快得多.

这是我正在运行的生产应用程序的真实示例。所以我有一些问题:

可以比其他分区快得多地读取某些分区吗?

请注意,这种行为仅在我启动多个消费者时才会发生。

我应该改变我的消费方式吗?我应该使用每个分区的源,还是有不同的选择?

更新

我怀疑插入多个消费者(读取多个应用程序)时可能会发生这种情况,但是今天只使用一个消费者就发生了,您可以通过查看消费者组来看到,这是相同的。

在此处输入图像描述

在它发生的时候,我还有 20MM 的消息等待处理(滞后)。上图是我们公司的Kafka经理拍的。