问题标签 [spark-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
1653 浏览

apache-spark - JavaDStream print() 函数不打印

我是 Spark 流媒体的新手。我按照此链接中的教程进行操作:https ://spark.apache.org/docs/latest/streaming-programming-guide.html

当我运行代码时,我可以看到正在处理该行,但我看不到带有时间戳的输出。

我只能看到这个日志:

另外我试图用 forEachRDD 函数调用保存最后一个 DStream,数据没有被存储。如果有人可以帮助我,那将是一个很大的帮助..

0 投票
2 回答
7384 浏览

cassandra - 在 Apache Spark 中将批处理 RDD 的结果与流式 RDD 相结合

上下文: 我正在使用 Apache Spark 从日志中汇总不同事件类型的运行计数。日志存储在 Cassandra 中用于历史分析目的和 Kafka 中用于实时分析目的。每个日志都有一个日期和事件类型。为简单起见,假设我想跟踪每天单一类型的日志数量。

我们有两个 RDD,一个来自 Cassandra 的批处理数据 RDD,另一个来自 Kafka 的流式 RDD。伪代码:

问题: 如何将 streamRDD 的结果与 batchRDD 结合起来? 假设batchRDD具有以下数据,并且该作业于 2014-10-16 运行:

由于 Cassandra 查询只包含到批量查询开始时间之前的所有数据,因此我们必须在查询完成时从 Kafka 读取,只考虑作业开始时间之后的日志。我们假设查询需要很长时间。这意味着我需要将历史结果与流结果结合起来。

举例说明:

然后假设在第一个流批次中我们得到了这个数据:

然后我想将批处理 RDD 与此流 RDD 组合,以便流 RDD 现在具有值:

然后假设在第二批流中我们得到了这个数据:

然后应该更新流 RDD 以具有以下值:

等等...

可以使用streamRDD.transformToPair(...)a 将 streamRDD 数据与 batchRDD 数据结合起来join,但如果我们对每个流块执行此操作,那么我们将为每个流块添加来自 batchRDD 的计数,从而使状态值“双重计数”,当它应该只添加到第一个流块中。

0 投票
1 回答
867 浏览

spark-streaming - Spark Streaming UpdateStateByKey

我正在运行 24X7 的 spark 流,并使用 updateStateByKey 函数来保存计算的历史数据,例如 NetworkWordCount 示例..

我试图流式传输具有 3lac 记录的文件,每 1500 条记录睡眠 1 秒。我正在使用 3 名工人

  1. 过一段时间updateStateByKey在增长,那么程序抛出如下异常

错误执行程序:任务 ID 1635 java.lang.ArrayIndexOutOfBoundsException 中的异常:3

如何处理?我想 updateStateByKey 应该随着它的快速增长而定期重置,请分享一些关于何时以及如何重置 updateStateByKey 的示例。或者我还有其他问题吗?一些启发。

任何帮助深表感谢。谢谢你的时间

0 投票
2 回答
1548 浏览

apache-spark - 为什么火花流很慢?

我使用了来自 github 存储库的 spark 流示例程序,并尝试使用 kafka 和自定义接收器。在这两种情况下,我都会在 20-30 秒后得到输出。在自定义接收器代码中,我立即获取数据,但输出需要 20-30 秒。我在单个节点上运行此代码。

我做错了什么还是有优化,我需要执行还是因为我在单个节点上运行。

如果有人可以指导我这将是一个很大的帮助。

我使用了 spark 存储库中的代码,代码如下:

0 投票
3 回答
2441 浏览

spark-streaming - 带有 updateStateByKey 问题的 Spark 24X7 流式传输

我正在运行 24/7 的火花流并使用 updateStateByKey 是否可以 24/7 运行火花流?如果是,updateStateByKey 不会变大,如何处理?当我们 24/7 运行时,我们是否必须定期重置/删除 updateStateByKey 如果不是如何以及何时重置它?还是 Spark 以分布式方式处理?如何动态地创建内存/存储。

当 updateStateByKey 增长时,我收到以下错误

如何处理这个..如果有任何文档请指出我?我完全被卡住了,非常感谢您的帮助..感谢您的宝贵时间

0 投票
2 回答
660 浏览

twitter - 使用 Spark Streaming 持久化推文

首先,我们的要求相当简单。当推文进来时,我们需要做的就是将它们保存在 HDFS 上(定期)。

JavaStreamingContext 的“检查点”API 看起来很有希望,但经过进一步审查,它似乎服务于不同的目的。(另外,我不断收到 '/checkpoint/temp, error: No such file or directory (2)' 错误,但我们暂时不用担心)。

问题:JavaDStream 没有“saveAsHadoopFiles”方法——这有点道理。我想从流式作业保存到 Hadoop 不是一个好主意。

推荐的方法是什么?我是否应该将传入的“推文”写入 Kafka 队列,然后使用诸如“Camus”(https://github.com/linkedin/camus)之类的工具推送到 HDFS?

0 投票
2 回答
1213 浏览

scala - 如何从 Spark Streaming 中的 DStream 中删除 RDD?

我想从 DStream 中删除前 n 个 RDD。我尝试将以下函数与转换一起使用,但它不起作用(错误 OneForOneStrategy:org.apache.spark.SparkContext java.io.NotSerializableException),我认为它不会实现我删除 RDD 的真正目标因为它会返回空的。

0 投票
1 回答
826 浏览

scala - 为什么使用 updateStateByKey 时任务大小一直在增长?

我编写了一个与 updateStateByKey 一起使用的简单函数,以查看问题是否是由于我的 updateFunc 引起的。我想这一定是由于其他原因。我在 --master local[4] 上运行它。

一段时间后,有警告,并且任务大小不断增加。

WARN TaskSetManager:Stage x 包含一个非常大的任务(129 KB)。建议的最大任务大小为 100 KB。

WARN TaskSetManager:Stage x 包含一个非常大的任务(131 KB)。建议的最大任务大小为 100 KB。

0 投票
2 回答
8069 浏览

spark-streaming - Spark Shell 找不到 Hbase 类

我正在尝试使用 Spark Streaming 将数据从 HDFS 加载到 Hbase 表。我将数据放在 HDFS 目录运行时并使用 textFileStream 函数读取它。由于 spark 在类路径中没有 hbase jar,即使在 spark shell 中导入 Hbase jar 时,它也会给我一个错误。

0 投票
1 回答
1330 浏览

scala - Spark:如何将 PartialFunction 传递给 DStream?

我正在尝试将部分函数传递给通过滑动窗口在 DStream 批处理中捕获的所有 RDD 的联合。假设我在离散为 1 秒批次的流上构建了一个超过 10 秒的窗口操作:

window将有 K 个 RDD。我想collect(f: PartialFunction[T, U])在所有 K 个这些 RDD 的联合上使用。++我可以使用调用联合运算符foreachRDD,但我想返回一个RDDnot aUnit并避免副作用。

我正在寻找的是一个减速器

DStream我可以像这样使用:

但这在 Spark Streaming API 中不可用。

有没有人有任何好的想法可以将流中捕获的 RDD 组合成单个 RDD,以便我可以传入部分函数?还是为了实现我自己的 RDD 减速器?也许这个功能会在随后的 Spark 版本中出现?