问题标签 [scala-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Scala 分组 Stream 元素而不评估整个 Stream
如果我有这样的排序流:
Stream(1, 1, 2, 2, 2, 3, 4, 4, 5)
如何将其内容分组如下:
Stream(List(1, 1), List(2, 2, 2), List(3), List(4, 4), List(5))
没有立即对流进行完全评估?
scala - 火花流检查点
我正在使用 Spark Kafka 直接流式传输来自 Kafka 的消息。我想实现零消息丢失,重新启动 spark 后,它必须从 Kafka 读取丢失的消息。我正在使用检查点来保存所有读取偏移量,以便下次 spark 将从存储的偏移量开始读取。这是我的理解。
我使用了下面的代码。我停止了我的火花,向卡夫卡推送了一些信息。重新启动未从 Kafka 读取丢失消息的火花后。Spark 读取来自 kafka 的最新消息。如何读取来自 Kafka 的遗漏消息?
注意:应用程序日志显示 auto.offset.reset 为none而不是latest。为什么 ?
SBT
Windows 7的
scala - Spark Kafka Streaming CommitAsync 错误
我是 Scala 和 RDD 概念的新手。在 Spark 中使用 Kafka 流 api 从 kafka 读取消息并尝试在业务工作后提交。但我收到错误。
注意:使用重新分区进行并行工作
如何从流 APi 中读取偏移量并将其提交给 Kafka?
scalaVersion := "2.11.8" val sparkVersion = "2.2.0" val connectorVersion = "2.0.7" val kafka_stream_version = "1.6.3"
代码
错误
java.io.NotSerializableException:org.apache.spark.streaming.dstream.TransformedDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。这是为了避免 Spark 任务因不必要的对象而膨胀。在 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1 的 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525)。 apply(DStream.scala:512) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.util.Utils$.tryOrIOException(Utils .scala:1303) 在 org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:
scala - Spark Kafka Streaming 多分区 CommitAsync 问题
我正在阅读来自具有多个分区的 Kafka 主题的消息。从消息中读取没有问题,在将偏移范围提交给 Kafka 时,出现错误。我尽力了,但无法解决这个问题。
代码
错误
scala - Spark 流在 VM 中部署的独立集群中不起作用
我已经使用 Scala 编写了 Kafka 流程序并在 Spark 独立集群中执行。代码在我的本地运行良好。我已经在 Azure VM 中完成了 Kafka、Cassandra 和 Spark 设置。我已打开所有入站和出站端口以避免端口阻塞。
开始大师
sbin>./start-master.sh
启动奴隶
sbin# ./start-slave.sh spark://vm-hostname:7077
我已经在 Master WEB UI 中验证了这个状态。
提交作业
bin#./spark-submit --class xyStreamJob --master spark://vm-hostname:7077 /home/user/appl.jar
我注意到应用程序添加并显示在 Master WEB UI 中。
我已经向主题发布了几条消息,并且没有收到消息并将其保存到 Cassandra DB。
我单击主 Web 控制台上的应用程序名称,并注意到该应用程序控制台页面中没有 Streaming 选项卡。
为什么应用程序不能在 VM 中运行而在本地运行良好?
如何调试 VM 中的问题?
版本
scala - 在 Scala 中懒惰地用几个元素创建一个 Stream
仅出于测试目的,我想懒惰地计算 2 个元素:
但是运行此代码不会产生预期的结果。这些值似乎同时出现在输出中。原因是Stream()
构造函数采用了一个必须急切计算的数组。所以要解决这个问题,我不得不像这样手动创建流的 thunk:
现在完全按照预期工作,但不是特别漂亮。
有没有一种更简洁、更方便的方法来让语法上类似于Stream(a, b, c)
但懒惰地评估参数?
谢谢
scala - 使用猫流scala读取大文件
读取一个大文本文件(40M+ 行)并对这个列表进行一些操作并将输出写入新文件。
例如:调用 Web 服务并使用响应与此列表进行联合或交集(重复此过程数百次)
使用 scala 使用猫或 scala 流库以功能方式实现此功能的最佳方法是什么(没有 OOM 问题)?
- 分块读取数据
- 对当前列表进行操作(联合或交集)
- 写入新文件
scala - 使用 scala fs2 文件流从文件中删除过滤后的行
如何使用fs2filtered
从当前流文件中删除行并获取过滤行数作为返回类型?
例如:如果old.txt
包含由换行符 (\n) 分隔的字符串:
和val myList = List("chen","yval")
。
scala - Scala 中的 F# seq monad 等价物是什么
我正在尝试从 F# 迁移到 Scala。在 F# 中,我们可以轻松地创建带有计算表达式或 monad 的 seq。例如:
我阅读了有关 scala的信息Stream
,但我不确定如何正确使用它,例如上面的示例,其中包含一些在 seq 生成期间不断更新的状态。
另一个例子是在 seq 中做一些初始化和清理工作:
我们可以在scala中做到这一点吗?
scala - 如何在流中找到两个连续且相同的值?
如何在 a 中找到两个连续且相同的值Stream
并返回此“重复值”:
例如Stream(1, 2, 4, 3, 5, 5, 2, 2)
会导致5
.
如何做到这一点?