问题标签 [alpakka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
77 浏览

scala - 测量 Alpakka 流的内存消耗、吞吐量和时间

我想测量处理来自 Kafka 队列的数百万条记录的吞吐量和时间。我已经编写了 Alpakka 程序来执行此操作,但现在我想获得一些关于它的指标。

我尝试使用sbt-jmh但不成功。它使程序崩溃了,而且还因为 akka-stream 在没有明确关闭的情况下不会终止,它不会吐出报告。

有没有人尝试使用sbt-jmh分析 akka-stream/alpakka 系统?我感兴趣的主要指标是吞吐量、总耗时和总内存消耗。

0 投票
1 回答
593 浏览

java - 使用 Alpakka 连接器的多个消费者线程

我正在使用 Alpakka kafka 连接器来消耗来自 kafka 的数据包。我使用 Consumer 作为 CommittableSource。我想在一台机器上创建多个消费者线程并将它们用作单一来源。我怎样才能做到这一点?

目前,我使用 Consumer.CommittableSource 创建了多个源,并使用“合并”功能将所有源合并为一个源。但我不确定这是否是正确的方法,因为我没有创建线程。

请在下面找到我当前使用的源代码:

0 投票
1 回答
211 浏览

java - 在 MongoSink 响应后向 kafka 消费者提交 - alpakka mongo 连接器

我正在使用 alpakka 连接器来使用来自 Kafka 的数据包并将它们插入到 Mongo db 中。在得到 Mongo db 的响应后,我试图提交偏移量,但找不到任何相同的东西。如何确保只有在数据包成功插入 Mongodb 后才会提交偏移量?

我使用 Consumer.CommittableSource 作为源,使用 MongoSink 作为接收器,并使用 RunnableGraph 运行流。请参阅代码以获得更多说明。

资源:

流动:

下沉:

图形:

编辑

使用 PassThroughFlow,向 Mongo 的插入工作正常,它没有给出任何异常或错误,但仍然无法提交数据包。transformationCommit() 函数从未被调用过。

更新流程:

下沉:

传递流:

图形:

0 投票
1 回答
1642 浏览

scala - 为什么日志记录不适用于 Akka Stream

我正在使用 Alpakka,下面有玩具示例:

此代码正在运行。但我在记录时遇到问题。带有属性的第一部分是日志记录很好。当元素进入时,它会将日志记录到标准输出。但是当 LogRotatorSink 完成并且未来完成时,我想将 DONE 打印到标准输出。这是行不通的。正在生成文件,因此进程正在运行,但没有向标准输出发送“DONE”消息。

请问如何将“DONE”输出到标准输出?

0 投票
1 回答
39 浏览

scala - MongoDB 通过模式匹配获取正确的参考

我正在尝试使用https://doc.akka.io/docs/alpakka/current/mongodb.html api 将记录取决于数据类型插入 mongodb。
我们先来看看数据类型:

对于MsgDocsum 类型的每个居民,我将创建他们自己的注册表,因为我想存储在不同的集合中

然后让我们尝试在 mongo db 中插入一条记录:

作为该insertOne方法的参数,我想传递preFailureCollproceedColl取决于数据类型(MsgPreFailureMsgProceed)。在上面的示例中,它是MsgPreFailure,那么它应该执行以下调用

问题是,如何在insertOne方法内进行模式匹配以获取正确的引用?

0 投票
1 回答
69 浏览

scala - 未找到:来自注册表的值

我正在尝试使用https://doc.akka.io/docs/alpakka/current/mongodb.html如下:

编译器抱怨:

我错过了什么?该项目可以在这里找到https://gitlab.com/playscala/trymongo

0 投票
0 回答
144 浏览

apache-kafka - 在 kafka 流处理期间执行高效的外部系统查询

我想使用来自 Kafka 主题的事件,将其注入数据库,执行一些查询,从数据库中删除事件并将查询结果生成回主题。

这看起来像在 Kafka 流处理期间在外部系统查询中询问的内容

例如,我可以对空间数据库执行地理定位查询,我将首先从接收到的消息中插入坐标,执行查找以计算某个邻域,从数据库中删除坐标并向下游转发包含邻域的结果消息。

我可以使用转换器来执行查询并将丰富的消息转发到下游,如上一个链接中的第一个解决方案中所建议的那样。但是,在执行此操作时,我对 KafkaStream 存在一些性能问题,因为它会导致对 Db 进行每个事件的查询。

  • 我可以使用像窗口流这样的流模式来批量处理多个坐标,并且每批只执行一个查询吗?
  • 如果发生数据库故障,KafkaStreams 将如何反应?
  • KafkaStream 线程会被卡住吗?
  • 流应用程序会失败吗?
  • 一旦数据库恢复,KafkaStream 线程会很好地恢复吗?
  • 这种设计的极端案例是什么?
  • 开发一个外部服务来
    使用 KafkaConnect 或 alpakka 执行数据库查询并从一个主题读取请求并将响应写入另一个主题会更好吗?
0 投票
1 回答
518 浏览

java - 将 KafkaAvroDeserializer 与 Alpakka 一起使用

我有一个 SchemaRegistry 和一个 KafkaBroker,我使用 Avro v1.8.1 从中提取数据。对于反序列化,我一直在使用 Confluent 的KafkaAvroDeserializer。现在我打算重构我的代码以使用 Alpakka 提供的Elasticsearch API,但不幸的是,这会破坏反序列化,因为它会导致 NullPointerExceptions:

线程 "main" org.apache.kafka.common.errors.SerializationException 中的异常:在偏移量 0 处反序列化分区 topic-0 的键/值时出错。如果需要,请寻找过去的记录以继续消费。引起:org.apache.kafka.common.errors.SerializationException:为 id 2 反序列化 Avro 消息时出错 引起:io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) 处的 java.lang.NullPointerException io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 在 org.apache.kafka.common.serialization.Deserializer.在 org.apache.kafka.clients.consumer 反序列化(Deserializer.java:58)。

我一直在使用 Alpakka 的 ConsumerSettings API,如本示例中所述:

这些设置会导致 NullPointerExceptions,而这个香草 Kafka Consumer 道具工作正常:

在工作示例中,ConsumerRecords 的值成功反序列化为 AvroMavenPlugin 从模式生成的类。

任何提示表示赞赏!

0 投票
0 回答
257 浏览

apache-kafka - Alpakka Kafka:模式注册表的序列化异常中断了流

我试图弄清楚如何处理带有错误 avro 消息的异常。我目前正在

可以看出,这打破了流。我无法在决策者中处理此问题,因为它是消费者来源的一部分。在文档中,它说我应该将流作为原始字节读取,并在处理链中进一步的 Flow 阶段手动进行解析。但是,如果我使用 Schema 注册表,我认为这是不可能的。

有人可以提示我处理此问题的正确方法是什么吗?

谢谢

0 投票
1 回答
243 浏览

scala - 强制 alpakka kafka 消费者在反序列化错误时显示错误消息

Alpakka kafka 消费者处理记录,直到遇到无法反序列化并静默死亡而不留下错误消息的记录。如何强制它报告错误信息?