问题标签 [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
814 浏览

java - Spark流式减少多键Java

我对 Spark Streaming 很陌生,我一直在试图弄清楚如何处理这个问题,因为我发现了很多关于单 (K,V) 对的例子,但还有更多。为了找到使用 Spark 与 Java 的转换的最佳方法,我将不胜感激。

让我简单描述一下场景,

目标是获得一组元素在一个时间窗口内的错误率。

给定以下输入,

它将按元素聚合,然后按状态聚合(Element, (Number of Success, Number of Error))。在这种情况下,转换的结果将是,

最后,使用 (i1,i2) -> i1/(i1+i2) 等函数进行比率计算。

据我了解,结果将由reduceByKeyAndWindow()函数给出,例如,

按照应用程序的反向流程,我的问题是,

如何在具有多个值或键的 JavaPairDStream 上定义一对(可能类似于JavaPairDStream<String, Tuple2<Integer,Integer>>)?

reduceFunc对于给定的具有多个键的对,哪种方法最好?

哪个是映射初始 DStream 的最佳方式(可能类似于JavaDStream<Tuple2<String, String>> line = input.map(func))?

预先感谢您的帮助。

0 投票
1 回答
302 浏览

java - Spark DStream的foreachDD函数中RDD的并发转换

在下面的代码中,函数 fn1 和 fn2 似乎以顺序方式应用于 inRDD,正如我在 Spark Web UI 的 Stages 部分中看到的那样。

以这种方式运行流式作业时有何不同。以下函数是否在输入 Dstream 上并行运行?

0 投票
1 回答
347 浏览

scala - 如何在 Apache Spark 中使用 DStream 进行特征提取

我有从 Kafka 通过 DStream 到达的数据。我想执行特征提取以获得一些关键字。

我不想等待所有数据的到达(因为它旨在成为可能永远不会结束的连续流),所以我希望分块执行提取 - 对我来说准确性是否会受到一点影响并不重要。

到目前为止,我整理了类似的内容:

然而,我收到了java.lang.IllegalStateException: Haven't seen any document yet.——我并不感到惊讶,因为我只是试着把东西拼凑在一起,而且我明白,由于我不等待一些数据的到来,所以当我尝试在数据上使用生成的模型时,它可能是空的。

解决这个问题的正确方法是什么?

0 投票
1 回答
1851 浏览

java - Spark将JavaPairDStream流式传输到文本文件

我对 Spark 流媒体很陌生,我一直在保存我的输出。

我的问题是,如何将 JavaPairDStream 的输出保存在文本文件中,该文件仅使用 DStream 中的元素为每个文件更新?

例如,对于 wordCount 示例,

我会使用以下输出wordCounts.print()

我想将最后几行写入一个文本文件,每批都会用wordCounts.

我尝试了以下方法,

这会生成一堆目录,每批都有几个无意义的文件。

另一种方法是,

我会很感激一些帮助。

谢谢

0 投票
0 回答
72 浏览

apache-spark - 火花流 kafka 并行接收器接收数据不平衡

我只想尝试从 kafka 并行接收流数据。这是我的代码:

我在 yarn 上运行代码,numReceiver 是 5,并且代码确实有效。但问题是只有两个接收器接收数据。num-executor 是 5,executor-core 是 2

结果:

0 投票
1 回答
3566 浏览

apache-spark - 当调用 pprint 时,pyspark 中的转换后的 DStream 会出错

我正在通过 PySpark 探索 Spark Streaming,当我尝试将transform函数与take.

我可以成功地使用sortByDStreamtransformpprint结果。

但是如果我使用take相同的模式并尝试pprint它:

工作失败了

您可以在此处的笔记本中查看完整的代码和输出。

我究竟做错了什么?

0 投票
1 回答
556 浏览

scala - 在 Spark 流式传输作业中调用实用程序(外部)

我有一个来自 Kafka 的流式传输作业(使用createDstream)。它的“id”流

我有一个实用程序或 api,它接受一个 id 数组并进行一些外部调用并接收一些信息,每个 id 都说“t”

我想DStream在调用实用程序时保留 Dstream。我不能在 Dstream rdd 上使用地图转换,因为它会调用每个 id,而且该实用程序正在接受 id 的集合。

如果我使用

我失去了DStream. 我需要保留DStream用于下游处理。

0 投票
1 回答
189 浏览

scala - Scala - 类似于 R 中的 Cbind 的 Spark Dstream 操作

1)我正在尝试使用 MLlib Random Forest 。我的最终输出应该有 2 列

我的特征集是训练数据和评分 --- train , score 但是当我训练和评分时,我删除了 id 字段,因为它不能用作特征,因为它对于每一行都是唯一的并且在预测方面没有智能,现在我得到了预测的分数

我的得分输出看起来像

但我想把它绑回 id

我在单独的 DStream 中有 id 字段,在单独的 DStream 中有 predict_value 字段。如何将它相互绑定,我没有任何列字段可以进行连接。

现在我该怎么把它系回来。例如 R 有一个函数 cbind 可以绑定来自不同数据帧的 2 列

是否有可能或任何替代方案?

2)我正在使用 MLlib 随机森林模型来预测使用火花流。最后,我想将特征 Dstream 和预测 Dstream 结合在一起进行进一步的下游处理。我怎样才能做到这一点?

提前致谢。

0 投票
1 回答
1701 浏览

pyspark - Dstream 上的 Pyspark 过滤器操作

我一直在尝试扩展网络字数,以便能够根据某些关键字过滤行

我正在使用火花 1.6.2

我已经尝试了所有的变化,

我几乎总是收到错误,我无法应用类似的功能

在 TransformedDStream 上 pprint/show/take/collect

. 我在 Dstream 行上使用了带有 foreachRDD 的转换,并带有一个函数来检查使用本机 python 字符串方法,这也失败了(实际上,如果我在程序中的任何地方使用 print,spark-submit 就会出现 - 没有报告错误。

我想要的是能够过滤传入的 Dstreams 关键字如“错误”| “警告”等并将其输出到标准输出或标准错误。

0 投票
1 回答
148 浏览

pyspark - Pyspark - 将控制权转移出 Spark 会话(sc)

这是一个后续问题

Dstream 上的 Pyspark 过滤器操作

为了计算一天、一小时内收到了多少错误消息/警告消息 - 一个人是如何设计这项工作的。

我试过的:

然而,这有多个问题,首先打印不起作用(不输出到标准输出,我已经阅读过它,我可以在这里使用的最好的东西是日志记录)。我可以将该函数的输出保存到一个文本文件并改为尾部该文件吗?

我不确定为什么程序刚刚出现,没有任何错误/转储可以进一步研究(spark 1.6.2)

如何保存状态?我正在尝试按服务器和严重性聚合日志,另一个用例是通过查找某些关键字来计算处理了多少事务

我想尝试的伪代码:

发送或打印字典的最后一部分需要切换出火花流上下文 - 有人可以解释一下这个概念吗?