问题标签 [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
86 浏览

python-3.x - 使用 Pyspark 流式传输 Twitter 数据时如何检索位置

我正在使用 PYSpark 实时流式传输推文。

我想检索文本、位置、用户名。目前,我只接收推文。无论如何也可以得到位置。

我正在使用这行代码来获取推文。

0 投票
2 回答
1044 浏览

apache-spark - 无法在 kafka 直接流、Spark 流中手动提交偏移量

我正在尝试验证手动偏移提交的工作。

当我尝试通过使用 thread.sleep()/jssc.stop()/ 在 while 循环中抛出异常来退出作业时,我看到正在提交偏移量。

我只是发送几条消息以进行测试,但是一旦作业开始处理批处理,我就会看到 0 滞后。

spark实际何时提交偏移量?

enable.auto.commit: 错误的

观察throw new Exception();in while 循环。即使批处理由于异常而失败,我也看到提交的偏移量,由于处理失败,我预计这里会有一些延迟,这里有什么问题?

0 投票
1 回答
684 浏览

python - Spark 的套接字文本流为空

我正在关注 Spark 的流媒体指南。我没有使用nc -lk 9999,而是创建了自己的简单 Python 服务器,如下所示。从下面的代码可以看出,它会a通过随机生成字母z

我用客户端代码测试了这个服务器,如下所示。

但是,我的 Spark 流代码似乎没有收到任何数据,或者它没有打印任何内容。代码如下。

我从输出中看到的只是下面的重复模式。

关于我做错了什么的任何想法?

0 投票
1 回答
57 浏览

apache-spark - 如何优雅地停止 spark dStream 进程

我正在尝试从 kafka 流中读取数据,对其进行处理并将其保存到报告中。我想每天运行一次这项工作。我正在使用 dStreams。dStreams中是否有等效的trigger(Trigger.Once)可以用于这种情况。感谢建议和帮助。

0 投票
0 回答
43 浏览

java - Reg:在 Spark 执行器中并行化 RDD 分区

我是新来的 spark 并尝试了一个示例 Spark Kafka 集成。我所做的是从单个分区的 Kafka 中发布 json:

我正在执行以下步骤:

  1. 我在 spark 作业中有一个 DStream 接收器。

  2. 我在那个 Dstream 上重新分区 3

  3. 然后我在这个 DStream 上执行一个 flatMap 来接收转换后的 RDDStream 中的分区键/值对。

  4. 我做了一个 groupbyKey 洗牌。

  5. 然后我在 RDDStream 步骤 4 上进行另一个映射转换,为每个值 json 添加更多键。

  6. 最后我做了一个 forEachPartition 并收集 RDD 并打印结果。

我在 Spark 独立模式下运行它,集群中有 3 个执行器。由于我从 Kafka 的单个分区接收数据,因此我将无法使用并行接收器来并行执行 DStream。如果我错了,请纠正我,但由于我在接收后进行了 3 的重新分区,我相信我将创建三个分区,并且这些分区上的后续映射转换将在 3 个执行器中并行执行。但我观察到的是,我的所有分区都只在一个执行程序中按顺序执行,而其他两个执行程序没有被使用。请问我可以对此提出一些建议吗?

如何在从单个分区 Kafka 主题的单个 DStream 接收器接收的并行执行器中执行 RDD 分区?

0 投票
1 回答
44 浏览

scala - Spark-Straming 中的 DStream 到 Rdd

我有一个DStream[String,String],我需要将其转换为RDD[String,String]. 有什么办法吗?我需要使用Scala语言。

提前致谢!!

0 投票
1 回答
154 浏览

apache-spark - Spark 永远不会停止处理第一批

我正在尝试运行我在 github 上找到的应用程序,这个:https ://github.com/CSIRT-MU/AIDA-Framework

我在 Ubuntu 18.04.1 虚拟机中运行它。在其数据处理管道中的某个时刻,它使用 spark 并且似乎在这一点上卡住了。我可以从 Web UI 中看到我发送的一些数据是作为批处理接收的。但是,它似乎永远不会完成第一批的处理(即使它有 0 条记录)。不幸的是,我对火花没有经验,也不知道到底是什么失败了。在搜索修复程序时,我遇到了一些建议,即可能没有足够的核心供所有执行程序使用。我试图将核心增加到 3 个,但这并没有帮助。

我已经提供了来自 Web UI 的所有屏幕,我希望它们能够清楚地显示问题。有谁知道我在这里做错了什么?

截图:Spark 1 Spark 2 Spark 3 Spark 4 Spark 5 Spark 6

排队和不完整的批处理作业的输出是

编辑:我注意到进程启动时会记录错误。我现在才意识到这一点,因为这个过程不会停止。错误是:

谁能帮我解决这些错误?

0 投票
0 回答
72 浏览

scala - ConstantInputDStream.print() 什么都不做

我正在尝试打印一个简单的 DStream,但没有成功。请参阅下面的代码。我在 Azure 中使用 Databricks 笔记本。

输出是:

我期待以一种或另一种方式看到 0,1,2。

我也试过添加

但它永远不会结束。看截图: 在此处输入图像描述

0 投票
1 回答
85 浏览

python - 计算每个 pyspark Dstream 中的元素数

我正在寻找一种方法来计算我每次在我正在使用的 pyspark 创建的 Dstream 中收到的元素数量(或 RDD 数量)。如果您知道可以帮助我的方法,我会很高兴。谢谢。

0 投票
2 回答
1217 浏览

python - 获取 RDD 中每个键的最大值和最小值

这是结果的一小部分:

我想获得每个键的最大值和最小值,如何?