问题标签 [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
493 浏览

scala - 积累 Spark Streaming DStream 的最佳方案

我正在寻找在 Spark DStream 中累积最后 N 条消息的最佳解决方案。我还想指定要保留的消息数。

例如,给定以下流,我想保留最后 3 个元素:

到目前为止,我正在研究 DStream 上的以下方法:

  1. updateStateByKey:鉴于所有消息都具有相同的密钥,我可以这样做。但看起来有点奇怪,为什么这需要知道密钥。
  2. mapWithState:Scala 中的 API 对于这么简单的事情来说太乏味了
  3. window: 似乎没有做这项工作,它还需要一个时间值来进行窗口化,而不是最后一个元素数
  4. 累加器:尚未真正使用Spark 文档中的累加器

实现这一目标的最佳解决方案是什么?

0 投票
1 回答
925 浏览

scala - 在火花流中,foreach和foreachRDD有什么区别

例如,将如何

不同于

注意这里x是a DStream

0 投票
0 回答
1049 浏览

python - Spark Streaming:从 DStream 到 Pandas Dataframe

在下面的代码片段中,我尝试将温度 DStream(从 Kafka 接收)转换为 pandas Dataframe。

照原样,从未计算过平均值,我想这是因为“df”不是熊猫数据框(?)。

我尝试df = spark.createDataFrame(df.toPandas())根据相关文档使用,但编译器无法识别“toPandas()”,并且从未发生转换。

我是否走在正确的道路上,如果是,我应该如何应用转型?

或者也许我的方法是错误的,我必须以不同的方式处理 DStream?

先感谢您!

0 投票
1 回答
89 浏览

scala - 在 Spark Streaming 中计算派生值

我有两个 org.apache.spark.streaming.dstream.DStream[Int] 类型的键值对。

第一个键值对是(单词,频率)。第二个键值对是(行数,值)。

我想将频率除以每个单词的值。但是,我低于错误 值/不是 org.apache.spark.streaming.dstream.DStream[Int] 的成员

示例代码:

f 是单词的频率,c 是 rdd 有单词和频率的总数

0 投票
1 回答
2213 浏览

apache-spark - 如何从字符串列表创建 DStream?

我有一个字符串列表,但我找不到将列表更改为火花流 DStream 的方法。我试过这个:

但是eclipse说sparkContext不是sqlContext的成员,所以,我该怎么做呢?感谢您的帮助,请。

0 投票
1 回答
78 浏览

spark-streaming - DStream 批处理上的 Spark Streaming 容错

假设如果在时间 X 接收到流。假设我的批处理持续时间是 1 分钟。现在我的执行者正在处理第一批。但是这个执行需要 3 分钟直到 X+3。但是在 X+1 和 X+2 我们收到另外两批。这是否意味着在 X+1 我的第一批丢失了?还是它存储在我的记忆中并且仍在处理中?

0 投票
1 回答
1031 浏览

scala - 火花流 if(!rdd.partitions.isEmpty) 不工作

我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流为空(if(!rdd.partitions.isEmpty)),我已经包含了一个捕获;但是,即使没有针对 kafka 主题发布任何事件,else也从未达到过声明。

是否有替代语句我应该使用KafkaUtils.createDirectStream而不是检查流是否为空createStream

0 投票
3 回答
583 浏览

scala - 将 spark dStream 与变量合并到 saveToCassandra()

我有一个DStream[String, Int带有成对字数的 ],例如("hello" -> 10). 我想用步长索引将这些计数写入 cassandra。该索引被初始化为var step = 1并随着每个微批处理的处理而递增。

cassandra 表创建为:

尝试将流写入表时...

......我明白了java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step

如何将step索引添加到流中以便将三列一起写入?

我正在使用 spark 2.0.0、scala 2.11.8、cassandra 3.4.0 和 spark-cassandra-connector 2.0.0-M3。

0 投票
1 回答
373 浏览

scala - 对 DStream 执行多重转换

我是相当新的 Spark Streaming

我有包含两个值 x y 的流数据。例如

1 300

2 8754

3 287

等等

在流数据中,我想得到最小的 y 值、最大的 y 值和 x 值的平均值。这需要输出如下(使用上面的示例):

287 8754 4

我已经能够在单个变换/减少上计算这些值,但无法使用单个变换

下面是我当前的代码

0 投票
3 回答
841 浏览

apache-spark - DStream 的 RDD 是否一次性提取为批处理间隔创建的全部数据?

我已经完成了这个stackoverflow 问题,根据答案它只为批处理间隔创建了DStream一个。RDD

例如:

我的批处理间隔为 1 分钟,Spark Streaming 作业正在使用来自 Kafka 主题的数据。

我的问题是,DStream 中可用的 RDD 是否提取/包含最后一分钟的全部数据?我们需要设置任何标准或选项来提取最后一分钟创建的所有数据吗?

如果我有一个带有 3 个分区的 Kafka 主题,并且所有 3 个分区都包含最后一分钟的数据,那么 DStream 是否会提取/包含所有 Kafka 主题分区中最后一分钟创建的所有数据?

更新:

在哪种情况下 DStream 包含多个 RDD?