问题标签 [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
245 浏览

apache-spark - 通过键 Spark Scala 加入 Dstream[Document] 和 Rdd

这是我的代码:

有没有办法加入这两个?

0 投票
0 回答
120 浏览

python-3.x - 通过检查点连接 DStream 和 RDD

我一直在努力执行 DStream 和 RDD 之间的连接。设置场景:

  • 火花 - 2.3.1
  • 蟒蛇 - 3.6.3

RDD

我正在从 CSV 文件中读取 RDD,拆分记录并生成一对 RDD。

这是来自的输出sku_prices.collect()

数据流

我正在阅读来自 Kafka 的 DStream。

当我运行pprint()时,orders我得到如下所示的输出:

加入

现在我想将itemsDStream加入sku_pricesRDD。我知道我不能直接加入,但我的阅读表明我可以使用transform()DStream 上的方法来完成这项工作。所以这就是我所拥有的:

我期待得到一个看起来像这样的 DStream:

Spark 文档建议这应该有效并且确实有效:结果正是我得到的!:)

检查点

但是我也想做一个有状态的操作,所以我需要引入检查点。

只需在以下位置添加检查点就会导致此错误transform()

您似乎正在尝试广播 RDD 或从操作或转换中引用 RDD。RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。

该线程上的答案表明检查点和外部 RDD 不能混合使用。有没有解决的办法?当 StreamingContext 启用了检查点时,是否可以加入 DStream 和 RDD?

谢谢,安德鲁。

0 投票
1 回答
176 浏览

apache-spark - Spark Streaming 在 DStream 上异步运行操作

我正在编写一个数据摄取程序。从 Kafka 读取到 DStream 将 Dstrem 拆分为 3 个流并在每个流上执行操作:

如果可以在整个 DStream 而不是 RDD 上触发异步提交。

0 投票
1 回答
326 浏览

scala - 带有 spark-testing-base 的 Scala Spark Streaming 单元测试抛出错误

我试图使用spark-testing-base对我的 spark 流代码运行单元测试。而且我在运行他们的示例代码时遇到了麻烦。

这是我复制的代码片段

进口 com.holdenkarau.spark.testing.SharedSparkContext 进口 org.scalatest.FunSuite

这是错误的堆栈跟踪。

我包括我的 build.sbt 不确定这是否有帮助。

是否有任何其他工具,建议使用 DStream 进行测试?

0 投票
1 回答
254 浏览

apache-spark - Spark QueueStream 永不耗尽

对我从互联网上借来的用于研究目的的一段代码感到困惑。这是代码:

我正在追踪它只是为了检查并注意到“你好”被永远打印出来:

我原以为 queueStream 会在 3 次迭代后耗尽。

那么,我错过了什么?

0 投票
1 回答
138 浏览

apache-spark - foreachRDD 中的案例类导致序列化错误

如果我不尝试使用案例类并简单地让列的默认名称使用 toDF() 或者如果我通过 toDF("c1, "c2") 分配它们,我可以在 foreachRDD 中创建一个 DF。

当我尝试使用案例类并查看示例后,我得到:

如果我改变 Case Class 语句,我会得到:

这是遗留问题,但我很好奇 Spark 可以产生的第 n 个序列化错误,以及它是否会延续到结构化流中。

我有一个不需要拆分的 RDD,可能是这个问题吗?不。在 DataBricks 中运行?

编码如下:

0 投票
1 回答
165 浏览

apache-spark - Kafka - Spark Streaming 集成:DStreams 和任务重用

我试图了解 Spark Streaming(不是 Structured Streaming)的内部结构,特别是任务查看 DStream 的方式。我将在 scala 中查看 Spark 的源代码,这里。我了解调用堆栈:

我了解 DStream 确实是 RDD 的哈希图,但我试图了解任务查看 DStream 的方式。我知道 Kafka Spark 集成基本上有两种方法:

  • 基于使用高级 Kafka 消费者 API 的接收器

    在这里, Receiver任务在每个批处理间隔(例如 5 秒)创建一个新的(微)批处理,其中包含 5 个分区(=> 1 秒块间隔),并将下游交给常规任务。

    问题:考虑我们的示例,每个微批次每 5 秒创建一次;恰好有 5 个分区,并且所有微批次的所有这些分区都应该以完全相同的方式在下游进行 DAG 处理,是相同的常规任务一遍又一遍地重复用于每个微批次 (RDD) 的相同分区 id 作为长时间运行的任务?例如

    如果在时间T0将分区(P1,P2,P3,P4,P5)的ubatch1分配给任务 id (T1, T2, T3, T4, T5),则分区( P1',P2',P3',P4 ',P5')在时间T5也被分配给同一组任务(T1、T2、T3、T4、T5)或者是否会为ubatch2创建新任务(T6、T7、T8、T9、T10)

    如果是后者,那么当您已经知道有任务在做完全相同的事情并且可以在长时间运行时重复使用时,必须每 5 秒通过网络向执行程序发送一次新任务不是性能密集型的吗?任务?

  • 直接使用低级 Kafka 消费者 API

    这里一个 Kafka 分区映射到一个 Spark 分区,因此映射到一个任务。同样,考虑主题t的 5 个 Kafka 分区,我们得到 5 个 Spark 分区及其对应的任务。

    问题:假设T0的ubatch1有分区(P1,P2,P3,P4,P5)分配给任务(T1,T2,T3,T4,T5)。在时间T5的 ubatch2分区( P1',P2',P3',P4',P5')是否也被分配给同一组任务(T1, T2, T3, T4, T5)或新任务(T6, T7、T8、T9、T10)ubatch2创建?

0 投票
2 回答
290 浏览

apache-spark - Spark DStream中基于消息时间戳构造窗口

我正在从 Kafka 接收 DStream,我想通过按键将所有消息分组在某个滑动窗口中。

关键是这个窗口需要基于每条消息中提供的时间戳(单独的字段):

所以,我想考虑每个键的消息timestamp of the first message- timestamp of the last message<= 5 分钟

正如我从这个问题中看到的那样,这是不可行的,因为 Spark 只计算事件的系统时间。那边的人建议使用updateStateByKey,这对我来说不是很清楚......

也许我们可以使用另一种方法来实现这一点?

如果在函数中包含时间戳的差异,combinerscombineByKey通过持续时间阈值进一步求和和过滤呢?

如果您有机会遇到同样的问题,请添加您的想法,或者分享您的解决方案......

谢谢!

0 投票
2 回答
3167 浏览

apache-spark - Spark Streaming 调整每批大小的记录数不起作用?

我的 spark 流应用程序正在使用 DStream 方法从 kafka 中读取数据,我试图让批处理大小在 10 秒内处理 60,000 条消息。

我所做的,

  • 创建了一个包含 3 个分区的主题
  • spark.streaming.kafka.maxRatePerPartition = 60000
  • spark.streaming.backpressure.enabled = true
  • 创建 StreamingContext 时将批处理持续时间设置为 10 秒
  • 以 2 个执行器在纱线模式下运行(3 个分区共 4 个核心)

现在我如何测试它是否有效。

我有一个生产者一次向该主题发送 60,000 条消息。当我检查 spark UI 时,我得到以下信息:

所以每批时间间隔 10 秒。我期望的是 1 批有 60,000 条记录。还有其他一些我没有设置的参数吗?从我读到的关于我目前设置的内容来看,我应该在一个批次中获得 10 * 60,000 * 3 = 1800000。

下面是我使用打印出来的

我删除了一些不必要的日志,如服务器地址、应用程序名称等。

我还有一些正在打印的 Kafka 配置,所以我也会在下面发布这些配置。

0 投票
1 回答
160 浏览

java - 具有两个值的键的最佳实践

到目前为止,我有一个 JavaDStream,它最初看起来像这样:

首先,我拆分了行并将其映射到 JavaPairDStream 中的键值对:

所以我得到了这个:

最后,输出应该是这样的

它计算每个键的独特水果和国家的数量。

现在最好的做法是什么?首先 groupByKey/reduceByKey 然后再拆分?或者是否可以像这样的键值对中的每个键有两个值?: