问题标签 [apache-spark-1.3]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1684 浏览

python-2.7 - 火花流。Py4j 的问题:获取新的通信通道时出错

我目前在 Spark 1.3 和 Python 2.7 上具有 50 个节点的集群上运行实时 Spark Streaming 作业。Spark 流上下文从 HDFS 中的目录读取,批处理间隔为 180 秒。以下是 Spark 作业的配置:

spark-submit --master yarn-client --executor-cores 5 --num-executors 10 --driver-memory 10g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.yarn.driver.memoryOverhead= 2048 --conf spark.network.timeout=300 --executor-memory 10g

这项工作在大多数情况下运行良好。但是,它在大约 15 小时后抛出 Py4j 异常,理由是它无法获得通信通道

我尝试减小 Batch Interval 大小,但随后会产生处理时间大于 Batch Interval 的问题。

下面是错误的截图

Py4j错误

我做了一些研究,发现这可能是来自此处SPARK-12617的 Socket 描述符泄漏的问题

但是,我无法解决该错误并解决它。有没有办法手动关闭可能阻止提供端口的打开连接。或者我是否必须对代码进行任何特定更改才能解决此问题。

TIA

0 投票
1 回答
1141 浏览

scala - 使用 Scala + Spark 1.3 逐步添加到 Hive 表

我们的集群有 Spark 1.3 和 Hive 有一个大型 Hive 表,我需要向其中添加随机选择的行。有一个较小的表,我读取并检查一个条件,如果该条件为真,那么我获取我需要的变量,然后查询要填充的随机行。我所做的是在该条件下进行查询table.where(value<number),然后使用take(num rows). 然后,由于所有这些行都包含我需要的关于大型 hive 表中需要哪些随机行的信息,所以我遍历数组。

当我进行查询时,我在查询中使用ORDER BY RAND()(使用sqlContext)。我创建了一个var Hive table(可变的)从较大的表中添加一列。在循环中,我做了一个 unionAllnewHiveTable = newHiveTable.unionAll(random_rows)

我尝试了许多不同的方法来做到这一点,但不确定什么是避免 CPU 和临时磁盘使用的最佳方法。我知道 Dataframes 不适合增量添加。我现在要尝试的一件事是创建一个 cvs 文件,在循环中逐步将随机行写入该文件,然后当循环完成时,将 cvs 文件加载为表,然后执行一个 unionAll 以获得我的最终结果桌子。

任何反馈都会很棒。谢谢

0 投票
0 回答
1186 浏览

scala - 使用 Scala 和 Spark 在单独的 Hive 分区上并行运行任务,以加快加载 Hive 并将结果写入 Hive 或 Parquet

这个问题是 [this one] 的衍生问题(将行列表保存到 pyspark 中的 Hive 表)。

编辑请在这篇文章的底部查看我的更新编辑

我已经使用 Scala 和现在的 Pyspark 来完成相同的任务,但是我遇到了将数据帧保存到镶木地板或 csv 或将数据帧转换为列表或数组类型数据结构的速度非常慢的问题。以下是相关的 python/pyspark 代码和信息:

我曾尝试在 Scala 中执行上述操作,并且遇到了类似的问题。我可以轻松加载配置单元表或查询配置单元表,但需要进行随机洗牌或存储大型数据帧会遇到内存问题。能够添加 2 个额外的列也存在一些挑战。

我要添加行的 Hive 表 (hiveTemp) 有 5,570,000 ~550 万行和 120 列。

我在 for 循环中迭代的 Hive 表有 5000 行和 3 列。有 25 个唯一的val1(hiveTemp 中的一列),val1以及val23000 的组合。Val2 可以是 5 列之一及其特定的单元格值。这意味着如果我调整了代码,那么我可以将行的查找次数从 5000 减少到 26,但是我必须检索、存储和随机洗牌的行数会非常大,因此会出现内存问题(除非有人对此有建议)

至于我需要添加到表中的总行数可能约为 100,000。

最终目标是让 5.5mill 行的原始表附加 100k+ 行写成 hive 或 parquet 表。如果它更容易,我可以在它自己的表中编写 100k 行,以后可以合并到 5.5 mill 表中

Scala 或 Python 很好,尽管 Scala 更受欢迎。

对此的任何建议以及最好的选择都会很棒。

非常感谢!

编辑我对这个问题的一些额外想法:我使用哈希分区器将配置单元表分区为 26 个分区。这是基于有 26 个不同值的列值。我想在 for 循环中执行的操作可以泛化,以便它只需要在这些分区中的每一个上发生。话虽如此,我怎么能,或者我可以在网上查看什么指南,以便能够编写 scala 代码来执行此操作,并让一个单独的执行器在每个分区上执行这些循环中的每一个?我认为这会使事情变得更快。
我知道如何使用多线程来做这样的事情,但不确定如何在 scala/spark 范式中使用。

0 投票
1 回答
6041 浏览

apache-spark - 如何在完成并关闭上下文后查看 Spark 作业的日志?

我在跑步pyspark,,,,spark 1.3standalone modeclient mode

我正在尝试通过查看过去的工作并进行比较来调查我的火花工作。我想查看他们的日志、提交作业的配置设置等。但是在上下文关闭后查看作业日志时遇到了麻烦。

当我提交工作时,我当然会打开一个 spark 上下文。在作业运行时,我可以使用 ssh 隧道打开spark Web UI 。而且,我可以通过localhost:<port no>. 然后我可以查看当前正在运行的作业以及已完成的作业,如下所示:

火花网络用户界面示例

然后,如果我希望查看特定作业的日志,我可以使用 ssh 隧道端口转发来查看该作业的特定机器的特定端口上的日志。

然后,有时作业会失败,但上下文仍然是打开的。发生这种情况时,我仍然可以通过上述方法查看日志。

但是,由于我不想一次打开所有这些上下文,所以当作业失败时,我会关闭上下文。当我关闭上下文时,该作业出现在上图中的“已完成的应用程序”下。现在,当我尝试使用 ssh 隧道端口转发来查看日志时,和以前一样(localhost:<port no>),它给了我一个page not found.

关闭上下文后如何查看作业的日志?spark context而且,这对日志的保存位置和保存位置之间的关系意味着什么?谢谢你。

再次,我正在跑步pyspark,,,,。spark 1.3standalone modeclient mode

0 投票
2 回答
49 浏览

apache-spark - 在 Spark 中,我如何通过名称本身而不是索引来读取字段

我使用 Spark 1.3。

我的数据有 50 多个属性,因此我选择了自定义类。

如何通过名称而不是位置访问自定义类中的字段

这里每次我需要调用一个方法 productElement(0)

我也不应该使用案例类,因此我使用自定义类作为模式。

我的火花代码:

如果我尝试从 OnlineEvents 访问任何字段,那么我需要使用 productElement() 。(即 online.productElement(0) 用于 gsm_id )

我可以直接以 online.gsm_id ... online.event_type 访问该字段,以便我的代码易于阅读

当我将自定义类用于模式时,如何通过字段名称直接访问字段?

0 投票
1 回答
1074 浏览

java - RDD.saveAsTextFile 之后的空文件是什么?

我通过学习 Spark:Lightning Fast Data Analysis 中的一些示例来学习 Spark,然后添加我自己的开发。

我创建了这个类来了解基本的转换和操作。

这是我用来运行它的命令:

这是日志文件的内容:

我注意到的一件事是输出创建了几个文件,而不是我期望的一个文件。

这些文件是:

看起来好像为每个警告/错误“分组”创建了一个文件。空白文件有什么用?

另外,这可能是我的代码中尚未找到的东西,还是 Spark 的特征?