问题标签 [scala-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
247 浏览

scala - Scala 分组 Stream 元素而不评估整个 Stream

如果我有这样的排序流:

Stream(1, 1, 2, 2, 2, 3, 4, 4, 5)

如何将其内容分组如下:

Stream(List(1, 1), List(2, 2, 2), List(3), List(4, 4), List(5))

没有立即对流进行完全评估?

0 投票
2 回答
4078 浏览

scala - 火花流检查点

我正在使用 Spark Kafka 直接流式传输来自 Kafka 的消息。我想实现零消息丢失,重新启动 spark 后,它必须从 Kafka 读取丢失的消息。我正在使用检查点来保存所有读取偏移量,以便下次 spark 将从存储的偏移量开始读取。这是我的理解。

我使用了下面的代码。我停止了我的火花,向卡夫卡推送了一些信息。重新启动未从 Kafka 读取丢失消息的火花后。Spark 读取来自 kafka 的最新消息。如何读取来自 Kafka 的遗漏消息?

注意:应用程序日志显示 auto.offset.reset 为none而不是latest。为什么 ?

SBT

Windows 7的

0 投票
1 回答
813 浏览

scala - Spark Kafka Streaming CommitAsync 错误

我是 Scala 和 RDD 概念的新手。在 Spark 中使用 Kafka 流 api 从 kafka 读取消息并尝试在业务工作后提交。但我收到错误。

注意:使用重新分区进行并行工作

如何从流 APi 中读取偏移量并将其提交给 Kafka?

scalaVersion := "2.11.8" val sparkVersion = "2.2.0" val connectorVersion = "2.0.7" val kafka_stream_version = "1.6.3"

代码

错误

java.io.NotSerializableException:org.apache.spark.streaming.dstream.TransformedDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。这是为了避免 Spark 任务因不必要的对象而膨胀。在 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1 的 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525)。 apply(DStream.scala:512) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.util.Utils$.tryOrIOException(Utils .scala:1303) 在 org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:

0 投票
2 回答
1979 浏览

scala - Spark Kafka Streaming 多分区 CommitAsync 问题

我正在阅读来自具有多个分区的 Kafka 主题的消息。从消息中读取没有问题,在将偏移范围提交给 Kafka 时,出现错误。我尽力了,但无法解决这个问题。

代码

错误

0 投票
1 回答
694 浏览

scala - Spark 流在 VM 中部署的独立集群中不起作用

我已经使用 Scala 编写了 Kafka 流程序并在 Spark 独立集群中执行。代码在我的本地运行良好。我已经在 Azure VM 中完成了 Kafka、Cassandra 和 Spark 设置。我已打开所有入站和出站端口以避免端口阻塞。

开始大师

sbin>./start-master.sh

启动奴隶

sbin# ./start-slave.sh spark://vm-hostname:7077

我已经在 Master WEB UI 中验证了这个状态。

提交作业

bin#./spark-submit --class xyStreamJob --master spark://vm-hostname:7077 /home/user/appl.jar

我注意到应用程序添加并显示在 Master WEB UI 中。

我已经向主题发布了几条消息,并且没有收到消息并将其保存到 Cassandra DB。

我单击主 Web 控制台上的应用程序名称,并注意到该应用程序控制台页面中没有 Streaming 选项卡

为什么应用程序不能在 VM 中运行而在本地运行良好?

如何调试 VM 中的问题?

版本

0 投票
2 回答
102 浏览

scala - 在 Scala 中懒惰地用几个元素创建一个 Stream

仅出于测试目的,我想懒惰地计算 2 个元素:

但是运行此代码不会产生预期的结果。这些值似乎同时出现在输出中。原因是Stream()构造函数采用了一个必须急切计算的数组。所以要解决这个问题,我不得不像这样手动创建流的 thunk:

现在完全按照预期工作,但不是特别漂亮。

有没有一种更简洁、更方便的方法来让语法上类似于Stream(a, b, c)但懒惰地评估参数?

谢谢

0 投票
0 回答
631 浏览

scala - 使用猫流scala读取大文件

读取一个大文本文件(40M+ 行)并对这个列表进行一些操作并将输出写入新文件。

例如:调用 Web 服务并使用响应与此列表进行联合或交集(重复此过程数百次)

使用 scala 使用猫或 scala 流库以功能方式实现此功能的最佳方法是什么(没有 OOM 问题)?

  1. 分块读取数据
  2. 对当前列表进行操作(联合或交集)
  3. 写入新文件
0 投票
1 回答
550 浏览

scala - 使用 scala fs2 文件流从文件中删除过滤后的行

如何使用fs2filtered从当前流文件中删除行并获取过滤行数作为返回类型?

例如:如果old.txt包含由换行符 (\n) 分隔的字符串:

val myList = List("chen","yval")

0 投票
1 回答
139 浏览

scala - Scala 中的 F# seq monad 等价物是什么

我正在尝试从 F# 迁移到 Scala。在 F# 中,我们可以轻松地创建带有计算表达式或 monad 的 seq。例如:

我阅读了有关 scala的信息Stream,但我不确定如何正确使用它,例如上面的示例,其中包含一些在 seq 生成期间不断更新的状态。

另一个例子是在 seq 中做一些初始化和清理工作:

我们可以在scala中做到这一点吗?

0 投票
3 回答
133 浏览

scala - 如何在流中找到两个连续且相同的值?

如何在 a 中找到两个连续且相同的值Stream并返回此“重复值”:

例如Stream(1, 2, 4, 3, 5, 5, 2, 2)会导致5.

如何做到这一点?