问题标签 [dstream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-3.x - 使用 Pyspark 流式传输 Twitter 数据时如何检索位置
我正在使用 PYSpark 实时流式传输推文。
我想检索文本、位置、用户名。目前,我只接收推文。无论如何也可以得到位置。
我正在使用这行代码来获取推文。
apache-spark - 无法在 kafka 直接流、Spark 流中手动提交偏移量
我正在尝试验证手动偏移提交的工作。
当我尝试通过使用 thread.sleep()/jssc.stop()/ 在 while 循环中抛出异常来退出作业时,我看到正在提交偏移量。
我只是发送几条消息以进行测试,但是一旦作业开始处理批处理,我就会看到 0 滞后。
spark实际何时提交偏移量?
enable.auto.commit
: 错误的
观察throw new Exception();
in while 循环。即使批处理由于异常而失败,我也看到提交的偏移量,由于处理失败,我预计这里会有一些延迟,这里有什么问题?
python - Spark 的套接字文本流为空
我正在关注 Spark 的流媒体指南。我没有使用nc -lk 9999
,而是创建了自己的简单 Python 服务器,如下所示。从下面的代码可以看出,它会a
通过随机生成字母z
。
我用客户端代码测试了这个服务器,如下所示。
但是,我的 Spark 流代码似乎没有收到任何数据,或者它没有打印任何内容。代码如下。
我从输出中看到的只是下面的重复模式。
关于我做错了什么的任何想法?
apache-spark - 如何优雅地停止 spark dStream 进程
我正在尝试从 kafka 流中读取数据,对其进行处理并将其保存到报告中。我想每天运行一次这项工作。我正在使用 dStreams。dStreams中是否有等效的trigger(Trigger.Once)可以用于这种情况。感谢建议和帮助。
java - Reg:在 Spark 执行器中并行化 RDD 分区
我是新来的 spark 并尝试了一个示例 Spark Kafka 集成。我所做的是从单个分区的 Kafka 中发布 json:
我正在执行以下步骤:
我在 spark 作业中有一个 DStream 接收器。
我在那个 Dstream 上重新分区 3
然后我在这个 DStream 上执行一个 flatMap 来接收转换后的 RDDStream 中的分区键/值对。
我做了一个 groupbyKey 洗牌。
然后我在 RDDStream 步骤 4 上进行另一个映射转换,为每个值 json 添加更多键。
最后我做了一个 forEachPartition 并收集 RDD 并打印结果。
我在 Spark 独立模式下运行它,集群中有 3 个执行器。由于我从 Kafka 的单个分区接收数据,因此我将无法使用并行接收器来并行执行 DStream。如果我错了,请纠正我,但由于我在接收后进行了 3 的重新分区,我相信我将创建三个分区,并且这些分区上的后续映射转换将在 3 个执行器中并行执行。但我观察到的是,我的所有分区都只在一个执行程序中按顺序执行,而其他两个执行程序没有被使用。请问我可以对此提出一些建议吗?
如何在从单个分区 Kafka 主题的单个 DStream 接收器接收的并行执行器中执行 RDD 分区?
scala - Spark-Straming 中的 DStream 到 Rdd
我有一个DStream[String,String]
,我需要将其转换为RDD[String,String]
. 有什么办法吗?我需要使用Scala
语言。
提前致谢!!
apache-spark - Spark 永远不会停止处理第一批
我正在尝试运行我在 github 上找到的应用程序,这个:https ://github.com/CSIRT-MU/AIDA-Framework
我在 Ubuntu 18.04.1 虚拟机中运行它。在其数据处理管道中的某个时刻,它使用 spark 并且似乎在这一点上卡住了。我可以从 Web UI 中看到我发送的一些数据是作为批处理接收的。但是,它似乎永远不会完成第一批的处理(即使它有 0 条记录)。不幸的是,我对火花没有经验,也不知道到底是什么失败了。在搜索修复程序时,我遇到了一些建议,即可能没有足够的核心供所有执行程序使用。我试图将核心增加到 3 个,但这并没有帮助。
我已经提供了来自 Web UI 的所有屏幕,我希望它们能够清楚地显示问题。有谁知道我在这里做错了什么?
截图:Spark 1 Spark 2 Spark 3 Spark 4 Spark 5 Spark 6
排队和不完整的批处理作业的输出是
编辑:我注意到进程启动时会记录错误。我现在才意识到这一点,因为这个过程不会停止。错误是:
谁能帮我解决这些错误?
python - 计算每个 pyspark Dstream 中的元素数
我正在寻找一种方法来计算我每次在我正在使用的 pyspark 创建的 Dstream 中收到的元素数量(或 RDD 数量)。如果您知道可以帮助我的方法,我会很高兴。谢谢。
python - 获取 RDD 中每个键的最大值和最小值
这是结果的一小部分:
我想获得每个键的最大值和最小值,如何?