问题标签 [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
747 浏览

pyspark - Convert Dstream to dataframe using pyspark

How i can convert a DStream to an dataframe? here is my actual code

it gives me the following error:

0 投票
0 回答
561 浏览

scala - spark流中的foreachRDD和transform有什么区别?

要使用 RDD 操作,我们可以使用foreachRDD()ortransorm()但我不明白它们之间有什么区别。

0 投票
1 回答
445 浏览

apache-spark - sortByKey 不适用于 Dstream

我正在使用 Dstream(Spark Streaming) 的 Transform API 对数据进行排序。我正在使用 netcat 从 TCP 套接字读取数据。遵循使用的代码行:myDStream.transform(rdd=>rdd.sortByKey())

找不到函数 sortByKey。任何人都可以请帮助这一步中的问题是什么?

0 投票
0 回答
70 浏览

scala - dstream[Double] 到 dstream scala

我正在开发 spark-consumer 应用程序,它正在使用来自 kafka 代理的消息,我想找到将要触发消费者的消息的平均值,最后我想将该平均值存储到 cassandra 中。

在上面的代码中,我得到了总数并准确地计算了我期望的值,但是我无法计算平均值,因为计数是 dstream[long] 并且总数是 dstream[double]。

我认为这条线有一些问题。“val avg = total.reduce((total,count) => total / count)” 任何帮助表示赞赏。

输出:计数: 这是我在流中作为 dstream[Long] 获得的计数输出 总计: 这是我在与 dstream[Double] 相同的流中获得的总输出

0 投票
1 回答
746 浏览

python - 使用 Python 进行 Spark Streaming:针对特定属性连接两个流

我正在接收两个套接字流 S1 和 S2 分别具有模式 S1 和 S2。

我想使用火花流加入关于属性“a”的 S1 和 S2。以下是我的代码:

S1 和 S2 都用逗号分隔。

虽然上面的代码执行了连接,但是相对于完整的行。

我有兴趣加入关于特定属性的两个流,在本例中为属性“a”。我怎样才能做到这一点?

非常感谢!

0 投票
1 回答
645 浏览

java - 如何从 JavaStreamingContext 生成 JavaPairInputDStream?

我正在学习 Apache Spark 流式传输并尝试JavaPairInputDStreamJavaStreamingContext. 下面是我的代码:

但是我的应用程序的最后一行抛出了这个异常:

queueStream(Queue<JavaRDD<T>>, boolean)类型中的方法JavaStreamingContext不适用于参数 ( Queue<JavaPairRDD<String,String>>, boolean)

我不知道如何使用 JavaStreamingContext 生成 JavaPairInputDStream。

0 投票
1 回答
885 浏览

scala - 如何使用 scala 在 spark 中合并多个 DStream?

我有来自 Kafka 的三个传入流。我解析作为 JSON 接收的流并将它们提取到适当的案例类并形成以下模式的 DStream:

我想加入这三个基于公共列的 DStream 即crt_object_id. 所需的 DStream 应采用以下形式:

请告诉我一种方法来做同样的事情。我对 Spark 和 Scala 都很陌生。

0 投票
1 回答
41 浏览

solaris - 解压缩压缩的 dstream 包

如何.dstream.Z在 Sun Solaris 中解压缩软件包。我尝试了以下所有方法

枪弹

解压

cpio

在这里我不得不按Ctrl+C,因为没有打印出来。

此外,我检查了文件命令:

0 投票
1 回答
193 浏览

apache-spark - Spark 是否将 Kafka 分区中的数据读取到执行程序中,以获取排队的批次?

在使用 streaming-kafka-0-8-integration Direct Approach 进行火花流式传输期间,如果批次正在排队,执行程序会将排队批次的数据拉入他们的内存中吗?如果不是,那么长期积压的批次有什么害处?

0 投票
1 回答
488 浏览

apache-spark - 从一系列离线事件中模拟 PySpark 中的 RDD Dstream

我需要将在线 Kafka 流式传输期间保存到 HDFS 的事件注入回 DStream PySpark 以进行相同的算法处理。我发现了 Holden Karau 的代码示例,它“等同于像 Kafka 这样的可检查点、可重放、可靠的消息队列”。我想知道是否可以在 PySpark 中实现它: