问题标签 [dstream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 积累 Spark Streaming DStream 的最佳方案
我正在寻找在 Spark DStream 中累积最后 N 条消息的最佳解决方案。我还想指定要保留的消息数。
例如,给定以下流,我想保留最后 3 个元素:
到目前为止,我正在研究 DStream 上的以下方法:
- updateStateByKey:鉴于所有消息都具有相同的密钥,我可以这样做。但看起来有点奇怪,为什么这需要知道密钥。
- mapWithState:Scala 中的 API 对于这么简单的事情来说太乏味了
- window: 似乎没有做这项工作,它还需要一个时间值来进行窗口化,而不是最后一个元素数
- 累加器:尚未真正使用Spark 文档中的累加器
实现这一目标的最佳解决方案是什么?
scala - 在火花流中,foreach和foreachRDD有什么区别
例如,将如何
不同于
注意这里x
是a DStream
。
python - Spark Streaming:从 DStream 到 Pandas Dataframe
在下面的代码片段中,我尝试将温度 DStream(从 Kafka 接收)转换为 pandas Dataframe。
照原样,从未计算过平均值,我想这是因为“df”不是熊猫数据框(?)。
我尝试df = spark.createDataFrame(df.toPandas())
根据相关文档使用,但编译器无法识别“toPandas()”,并且从未发生转换。
我是否走在正确的道路上,如果是,我应该如何应用转型?
或者也许我的方法是错误的,我必须以不同的方式处理 DStream?
先感谢您!
scala - 在 Spark Streaming 中计算派生值
我有两个 org.apache.spark.streaming.dstream.DStream[Int] 类型的键值对。
第一个键值对是(单词,频率)。第二个键值对是(行数,值)。
我想将频率除以每个单词的值。但是,我低于错误 值/不是 org.apache.spark.streaming.dstream.DStream[Int] 的成员
示例代码:
f 是单词的频率,c 是 rdd 有单词和频率的总数
apache-spark - 如何从字符串列表创建 DStream?
我有一个字符串列表,但我找不到将列表更改为火花流 DStream 的方法。我试过这个:
但是eclipse说sparkContext不是sqlContext的成员,所以,我该怎么做呢?感谢您的帮助,请。
spark-streaming - DStream 批处理上的 Spark Streaming 容错
假设如果在时间 X 接收到流。假设我的批处理持续时间是 1 分钟。现在我的执行者正在处理第一批。但是这个执行需要 3 分钟直到 X+3。但是在 X+1 和 X+2 我们收到另外两批。这是否意味着在 X+1 我的第一批丢失了?还是它存储在我的记忆中并且仍在处理中?
scala - 火花流 if(!rdd.partitions.isEmpty) 不工作
我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流为空(if(!rdd.partitions.isEmpty)
),我已经包含了一个捕获;但是,即使没有针对 kafka 主题发布任何事件,else
也从未达到过声明。
是否有替代语句我应该使用KafkaUtils.createDirectStream
而不是检查流是否为空createStream
?
scala - 将 spark dStream 与变量合并到 saveToCassandra()
我有一个DStream[String, Int
带有成对字数的 ],例如("hello" -> 10)
. 我想用步长索引将这些计数写入 cassandra。该索引被初始化为var step = 1
并随着每个微批处理的处理而递增。
cassandra 表创建为:
尝试将流写入表时...
......我明白了java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step
。
如何将step
索引添加到流中以便将三列一起写入?
我正在使用 spark 2.0.0、scala 2.11.8、cassandra 3.4.0 和 spark-cassandra-connector 2.0.0-M3。
scala - 对 DStream 执行多重转换
我是相当新的 Spark Streaming
我有包含两个值 x y 的流数据。例如
1 300
2 8754
3 287
等等
在流数据中,我想得到最小的 y 值、最大的 y 值和 x 值的平均值。这需要输出如下(使用上面的示例):
287 8754 4
我已经能够在单个变换/减少上计算这些值,但无法使用单个变换
下面是我当前的代码
apache-spark - DStream 的 RDD 是否一次性提取为批处理间隔创建的全部数据?
我已经完成了这个stackoverflow 问题,根据答案它只为批处理间隔创建了DStream
一个。RDD
例如:
我的批处理间隔为 1 分钟,Spark Streaming 作业正在使用来自 Kafka 主题的数据。
我的问题是,DStream 中可用的 RDD 是否提取/包含最后一分钟的全部数据?我们需要设置任何标准或选项来提取最后一分钟创建的所有数据吗?
如果我有一个带有 3 个分区的 Kafka 主题,并且所有 3 个分区都包含最后一分钟的数据,那么 DStream 是否会提取/包含所有 Kafka 主题分区中最后一分钟创建的所有数据?
更新:
在哪种情况下 DStream 包含多个 RDD?