问题标签 [spark-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - JavaDStream print() 函数不打印
我是 Spark 流媒体的新手。我按照此链接中的教程进行操作:https ://spark.apache.org/docs/latest/streaming-programming-guide.html
当我运行代码时,我可以看到正在处理该行,但我看不到带有时间戳的输出。
我只能看到这个日志:
另外我试图用 forEachRDD 函数调用保存最后一个 DStream,数据没有被存储。如果有人可以帮助我,那将是一个很大的帮助..
cassandra - 在 Apache Spark 中将批处理 RDD 的结果与流式 RDD 相结合
上下文: 我正在使用 Apache Spark 从日志中汇总不同事件类型的运行计数。日志存储在 Cassandra 中用于历史分析目的和 Kafka 中用于实时分析目的。每个日志都有一个日期和事件类型。为简单起见,假设我想跟踪每天单一类型的日志数量。
我们有两个 RDD,一个来自 Cassandra 的批处理数据 RDD,另一个来自 Kafka 的流式 RDD。伪代码:
问题:
如何将 streamRDD 的结果与 batchRDD 结合起来?
假设batchRDD
具有以下数据,并且该作业于 2014-10-16 运行:
由于 Cassandra 查询只包含到批量查询开始时间之前的所有数据,因此我们必须在查询完成时从 Kafka 读取,只考虑作业开始时间之后的日志。我们假设查询需要很长时间。这意味着我需要将历史结果与流结果结合起来。
举例说明:
然后假设在第一个流批次中我们得到了这个数据:
然后我想将批处理 RDD 与此流 RDD 组合,以便流 RDD 现在具有值:
然后假设在第二批流中我们得到了这个数据:
然后应该更新流 RDD 以具有以下值:
等等...
可以使用streamRDD.transformToPair(...)
a 将 streamRDD 数据与 batchRDD 数据结合起来join
,但如果我们对每个流块执行此操作,那么我们将为每个流块添加来自 batchRDD 的计数,从而使状态值“双重计数”,当它应该只添加到第一个流块中。
spark-streaming - Spark Streaming UpdateStateByKey
我正在运行 24X7 的 spark 流,并使用 updateStateByKey 函数来保存计算的历史数据,例如 NetworkWordCount 示例..
我试图流式传输具有 3lac 记录的文件,每 1500 条记录睡眠 1 秒。我正在使用 3 名工人
- 过一段时间updateStateByKey在增长,那么程序抛出如下异常
错误执行程序:任务 ID 1635 java.lang.ArrayIndexOutOfBoundsException 中的异常:3
如何处理?我想 updateStateByKey 应该随着它的快速增长而定期重置,请分享一些关于何时以及如何重置 updateStateByKey 的示例。或者我还有其他问题吗?一些启发。
任何帮助深表感谢。谢谢你的时间
apache-spark - 为什么火花流很慢?
我使用了来自 github 存储库的 spark 流示例程序,并尝试使用 kafka 和自定义接收器。在这两种情况下,我都会在 20-30 秒后得到输出。在自定义接收器代码中,我立即获取数据,但输出需要 20-30 秒。我在单个节点上运行此代码。
我做错了什么还是有优化,我需要执行还是因为我在单个节点上运行。
如果有人可以指导我这将是一个很大的帮助。
我使用了 spark 存储库中的代码,代码如下:
spark-streaming - 带有 updateStateByKey 问题的 Spark 24X7 流式传输
我正在运行 24/7 的火花流并使用 updateStateByKey 是否可以 24/7 运行火花流?如果是,updateStateByKey 不会变大,如何处理?当我们 24/7 运行时,我们是否必须定期重置/删除 updateStateByKey 如果不是如何以及何时重置它?还是 Spark 以分布式方式处理?如何动态地创建内存/存储。
当 updateStateByKey 增长时,我收到以下错误
如何处理这个..如果有任何文档请指出我?我完全被卡住了,非常感谢您的帮助..感谢您的宝贵时间
twitter - 使用 Spark Streaming 持久化推文
首先,我们的要求相当简单。当推文进来时,我们需要做的就是将它们保存在 HDFS 上(定期)。
JavaStreamingContext 的“检查点”API 看起来很有希望,但经过进一步审查,它似乎服务于不同的目的。(另外,我不断收到 '/checkpoint/temp, error: No such file or directory (2)' 错误,但我们暂时不用担心)。
问题:JavaDStream 没有“saveAsHadoopFiles”方法——这有点道理。我想从流式作业保存到 Hadoop 不是一个好主意。
推荐的方法是什么?我是否应该将传入的“推文”写入 Kafka 队列,然后使用诸如“Camus”(https://github.com/linkedin/camus)之类的工具推送到 HDFS?
scala - 如何从 Spark Streaming 中的 DStream 中删除 RDD?
我想从 DStream 中删除前 n 个 RDD。我尝试将以下函数与转换一起使用,但它不起作用(错误 OneForOneStrategy:org.apache.spark.SparkContext java.io.NotSerializableException),我认为它不会实现我删除 RDD 的真正目标因为它会返回空的。
scala - 为什么使用 updateStateByKey 时任务大小一直在增长?
我编写了一个与 updateStateByKey 一起使用的简单函数,以查看问题是否是由于我的 updateFunc 引起的。我想这一定是由于其他原因。我在 --master local[4] 上运行它。
一段时间后,有警告,并且任务大小不断增加。
WARN TaskSetManager:Stage x 包含一个非常大的任务(129 KB)。建议的最大任务大小为 100 KB。
WARN TaskSetManager:Stage x 包含一个非常大的任务(131 KB)。建议的最大任务大小为 100 KB。
spark-streaming - Spark Shell 找不到 Hbase 类
我正在尝试使用 Spark Streaming 将数据从 HDFS 加载到 Hbase 表。我将数据放在 HDFS 目录运行时并使用 textFileStream 函数读取它。由于 spark 在类路径中没有 hbase jar,即使在 spark shell 中导入 Hbase jar 时,它也会给我一个错误。
scala - Spark:如何将 PartialFunction 传递给 DStream?
我正在尝试将部分函数传递给通过滑动窗口在 DStream 批处理中捕获的所有 RDD 的联合。假设我在离散为 1 秒批次的流上构建了一个超过 10 秒的窗口操作:
我window
将有 K 个 RDD。我想collect(f: PartialFunction[T, U])
在所有 K 个这些 RDD 的联合上使用。++
我可以使用调用联合运算符foreachRDD
,但我想返回一个RDD
not aUnit
并避免副作用。
我正在寻找的是一个减速器
DStream
我可以像这样使用:
但这在 Spark Streaming API 中不可用。
有没有人有任何好的想法可以将流中捕获的 RDD 组合成单个 RDD,以便我可以传入部分函数?还是为了实现我自己的 RDD 减速器?也许这个功能会在随后的 Spark 版本中出现?