问题标签 [dstream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Flatten 加入 DStream
我已经加入了一些 DStream,因此 DStream 的当前“数据类型”看起来像这样(键和值):
但我想得到:
或者
有什么功能可以在我的 DStream 上应用来转换它,或者我该怎么做?
提前致谢!
testing - 以编程方式在 apache spark 中创建 dstream
我正在围绕 Apache Spark Streaming 编写一些自包含的集成测试。我想测试我的代码是否可以在我的模拟测试数据中摄取各种边缘情况。当我使用常规 RDD(不是流式传输)执行此操作时。我可以使用我的内联数据并在其上调用“并行化”以将其转换为 spark RDD。但是,我找不到这样的方法来创建 destreams。理想情况下,我想偶尔调用一些“推送”函数,并让元组神奇地出现在我的 dstream 中。ATM 我正在使用 Apache Kafka 执行此操作:我创建了一个临时队列,然后写入它。但这似乎有点矫枉过正。我宁愿直接从我的测试数据创建 test-dstream,而不必使用 Kafka 作为中介。
scala - Apache Spark Scala API:Scala 中的 ReduceByKeyAndWindow
由于我是 Spark 的 Scala API 的新手,我遇到了以下问题:
在我的 java 代码中,我做了一个 reduceByKeyAndWindow 转换,但现在我看到,只有一个 reduceByWindow(因为 Scala 中也没有 PairDStream)。但是,我现在开始使用 Scala 的第一步:
直到步骤 5.1 一切正常。在 5.2 中,我现在想总结countPreparedPerPlug1h的 1,但前提是其他属性(房屋、家庭、插头)相等。- 目标是获得每个(房屋、家庭、插头)组合的条目计数。有人可以帮忙吗?谢谢!
编辑 - 第一次尝试
我在步骤 5.2 中尝试了以下操作:
但在这里我得到以下错误:
似乎我使用reduceByKeyAndWindow 转换错误,但错误在哪里?要汇总的值的类型是 Int,请参见上面步骤 5.1 中的 countPreparedPerPlug1h。
apache-spark - Spark:从单个 DStream 中获取多个 DStream
是否可以从 spark 中的单个 DStream 中获取多个 DStream。我的用例如下:我从 HDFS 文件中获取日志数据流。日志行包含一个 id (id=xyz)。我需要根据 id 以不同的方式处理日志行。所以我试图为输入 Dstream 中的每个 id 设置不同的 Dstream。我在文档中找不到任何相关内容。有谁知道如何在 Spark 中实现这一点或指向任何链接。
谢谢
apache-spark - Spark Streaming - Travis CI 和 GitHub 自定义接收器 - 连续数据但 RDD 为空?
最近,作为科学研究的一部分,我一直在开发一个应用程序,它使用 Travis CI 和 GitHub 的 REST API 流式传输(或至少应该)数据。这样做的目的是深入了解提交-构建关系,以便进一步执行大量分析。
为此,我实现了以下 Travis 自定义接收器:
而接收器使用我定制的 TRAVIS API 库(使用 Apache Async 客户端在 Java 中开发)。但是,问题如下:我应该收到的数据是连续的并且不断变化,即不断地推送到 Travis 和 GitHub。例如,考虑 GitHub 每秒大约记录的事实。350 个事件 - 包括推送事件、提交评论和类似事件。
但是,当流式传输 GitHub 或 Travis 时,我确实从前两批中获取数据,但随后,DStream 的 RDD 部分为空 - 尽管有数据要流式传输!
到目前为止,我已经检查了几件事,包括用于省略对 API 的请求的 HttpClient,但它们都没有真正解决这个问题。
因此,我的问题是 - 会发生什么?为什么在周期 x 过去后 Spark 不流式传输数据。您可以在下面找到设置的上下文和配置:
提前致谢!
scala - Dstream 上的 combineByKey 引发错误
(String, Int)
我有一个包含元组的dstream
当我尝试combineByKey
时,它说我指定参数:Partitioner
但是,当我在 rdd 上使用它时,它可以正常工作:
我在哪里可以得到这个分区器?
scala - kafka directstream dstream映射不打印
我有这个简单的 Kafka Stream
Kafka 有消息,Spark Streaming 能够将它们作为 RDD 获取。但是我的代码中的第二个 println 不打印任何内容。我在本地 [2] 模式下运行时查看了驱动程序控制台日志,在纱线客户端模式下运行时检查了纱线日志。
我错过了什么?
以下代码代替 rdd.map,在 spark 驱动程序控制台中可以很好地打印:
但我担心这个飞行对象的处理可能发生在火花驱动程序项目中,而不是执行程序中。如果我错了,请纠正我。
谢谢
scala - How can I return two DStreams in a function after using the filter transformation in spark streaming?
In a function, is there a way to return two DStreams after using filter
?
For example when I filter a DStream
, the filtered ones will be stored in a DStream
and the unfiltered ones will be stored in another DStream
.
scala - 如何解决类型不匹配问题(预期:Double,实际:Unit)
这是我计算均方根误差的函数。但是由于错误,最后一行无法编译Type mismatch issue (expected: Double, actual: Unit)
。我尝试了许多不同的方法来解决这个问题,但仍然没有成功。有任何想法吗?
performance - 在 Spark Streaming 中的微批处理结束之前执行操作
是否有可能在 Spark Streaming 中的 DStream 内的每个微批次结束时执行一些操作?我的目标是计算 Spark 处理的事件数。Spark Streaming 给了我一些数字,但平均值似乎也总结了零值(因为一些微批次是空的)。
例如,我确实收集了一些统计数据并希望将它们发送到我的服务器,但是收集数据的对象只存在于某个批次期间,并且从头开始为下一批进行初始化。我希望能够在批处理完成并且对象消失之前调用我的“完成”方法。否则,我会丢失尚未发送到服务器的数据。