问题标签 [apache-spark-1.6]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
809 浏览

pyspark - pyspark 内存问题:引起:java.lang.OutOfMemoryError:Java 堆空间

伙计们,

我正在运行 pyspark 代码以从 hdfs 读取 500mb 文件并从文件内容构造一个 numpy 矩阵

集群信息:

9 个数据节点 128 GB 内存 /48 vCore CPU /节点

作业配置

错误

收集件正在吐出内存不足错误。

任何帮助深表感谢。

0 投票
2 回答
712 浏览

scala - 如何替换向量列中的空值?

我有一列 [vector] 类型的列,其中包含无法删除的空值,这是一个示例

我想做的是:

但这会引发错误:

我尝试过的其他事情:

如何从火花数据框中过滤出空值

我正在努力寻找适用于 Spark 1.6 的解决方案

0 投票
1 回答
2715 浏览

scala - 如何仅考虑列的子集删除重复项?

我使用 Spark 1.6 并在两个数据帧上进行内部连接,如下所示:

aggrgn_filter_group_id但我不断在列中得到重复的值。你能建议一些解决方案吗?

0 投票
1 回答
1050 浏览

scala - Spark Streaming 1.6 + Kafka:处于“排队”状态的批次过多

我正在使用火花流来使用来自 Kafka 主题的消息,该主题有 10 个分区。我正在使用直接方法从 kafka 消费,代码如下:

从代码中可以看出,我使用window来实现这个(如果我错了,请纠正我):由于有一个插入hive表的动作,我想避免过于频繁地写入HDFS,所以什么我想要的是在内存中保存足够的数据,然后才写入文件系统。我认为使用 window 将是实现它的正确方法。

现在,在下图中,您可以看到有许多批次正在排队,并且正在处理的批次需要很长时间才能完成。

只有一批处于处理状态,其他批次永远排队

我还提供了正在处理的单个批次的详细信息:

插入生成数千个任务! 为什么

当批处理中没有很多事件时,为什么插入操作有这么多任务?有时,事件为 0 也会产生数千个需要永远完成的任务。

我用 Spark 处理微批处理的方式错了吗?

谢谢你的帮助!

一些额外的细节:

纱线容器最大为 2gb。在这个 Yarn 队列中,容器的最大数量是 10。当我查看正在执行这个 spark 应用程序的队列的详细信息时,容器的数量非常大,大约 15k 待处理的容器。

0 投票
0 回答
98 浏览

apache-spark-sql - Spark 1.6 - 使用数据帧失败的 avro 文件覆盖目录

我在 HDFS 中有一个目录,其中包含 avro 文件。当我尝试用数据框覆盖目录时,它失败了。语法:avroData_df.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("") 错误是: Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path不存在:hdfs://nameservice1//part-r-00000-bca9a5b6-5e12-45c1-a877-b0f6d6cc8cd3.avro

在覆盖时,它似乎也在考虑 avro 文件。

我们可以使用 Spark 1.6 来做到这一点吗?

0 投票
0 回答
1975 浏览

apache-spark - 为什么persist(StorageLevel.MEMORY_AND_DISK) 给出的结果与使用HBase 的cache() 不同?

问这个问题听起来很天真,但这是我最近在项目中遇到的一个问题。需要对它有更好的理解。

每当我们在 HBase 读取中使用此类持久化时 - 相同的数据会一次又一次地返回到流作业的其他后续批次,但 HBase 会在每次批次运行时更新。

HBase 读取代码:

我替换persist(StorageLevel.MEMORY_AND_DISK)cache()它,它按预期从 HBase 表返回更新的记录。

我们尝试使用的原因persist(StorageLevel.MEMORY_AND_DISK)是确保内存中的存储不会被填满,并且我们不会在执行特定流期间重新进行所有转换。

Spark 版本- 1.6.3 HBase 版本- 1.1.2.2.6.4.42-1

有人可以向我解释一下并帮助我更好地理解吗?

0 投票
1 回答
8166 浏览

apache-spark - 如何使用 pyspark 在字段中读取带逗号的 CSV 文件?

我有一个在列值中包含逗号的 csv 文件。例如,

当数据中有额外的逗号时,这些值用双引号括起来。在上面的示例中,值为 Column1=123, Column2=45,6 和 Column3=789 但是,当尝试读取数据时,由于 Column2 字段中的额外逗号,它给了我 4 个值。

在PySpark中读取这些数据时如何获得正确的值?我正在使用Spark 1.6.3

我目前正在执行以下操作以创建一个 rdd,然后从 rdd 创建一个数据框。

0 投票
1 回答
159 浏览

csv - 如何根据pyspark中的日期字段将输入数据拆分为多个文件?

我有一个带有日期字段的配置单元表。

我正在阅读下表。

我想根据 date_field 列将输入数据拆分为多个文件,并将其放入 date_field 文件夹中。输出应如下所示。

例如:文件 (/data/2018-10-01/datafile.csv ) 应具有以下数据。

我应该采取什么方法来实现这一目标?

0 投票
1 回答
734 浏览

java - cast方法导致java spark中的空值

我有一个在两个数据帧上执行连接的简单用例,我使用的是 spark 1.6.3 版本。问题是,在尝试使用 cast 方法将字符串类型转换为整数类型时,结果列都是空值。

我已经尝试过这里提到的所有解决方案How to cast a column in dataframe? 但是所有问题都有scala api的答案,我找不到任何使用java api的人。

我无法找到它的解决方案,而且我尝试转换的列是字符串类型并且可能包含尾随空格,这可能是一个问题吗?如果是,那么我该如何删除它们,我尝试如下删除它们,但似乎不起作用。这是我第一次使用 spark 数据框,因此非常感谢任何帮助。谢谢!

0 投票
0 回答
143 浏览

scala - Spark 2.2 中的 Spark 程序比 Spark 1.6 慢两倍

我们正在将 Scala Spark 程序从 1.6.3 迁移到 2.2.0。有问题的程序有四个部分:我们称它们为 A、B、C 和 D 部分。A 部分解析输入(parquet 文件),然后缓存 DF 并创建一个表。然后B、C、D段依次对DF进行不同类型的处理。

这个 spark 作业每小时运行大约 50 次,输入文件的数量取决于当前可用的文件。executors、cores、executor memory 和 agg partitions 的数量是根据输入文件的数量和它们的大小来计算的。这些是这样计算的:

  • n_execs = min(n_infiles, num_datanodes)
  • n_cores = ceil(n_infiles/n_execs)
  • exec_mem = 1500 * n_cores (MB)
  • shuffle_partitions = n_infiles
  • driver_mem = ceil(n_infiles / 200) (GB)
  • 最大分区字节数 = 5*1024*1024(常数)

在我们的单元测试环境中,我们目前有 12 个数据节点,平均文件输入大小为 11MB / 630K 记录。所以一些例子是:

  • n_infiles (24): driver_mem (1G), n_execs (12), n_cores (2), exec_mem (3000M), shuffle_partitions (24)
  • n_infiles (4): driver_mem (1G), n_execs: (4), n_cores (1), exec_mem (1500M), shuffle_partitions (4)

现在的问题是:使用 Spark 2.2 分析 1095 次运行,我们看到 C 部分平均耗时 41.2 秒,而 D 部分平均耗时 47.73 秒。在 Spark 1.6.3 中,超过 425 次运行的 C 部分平均耗时 23 秒,而 D 部分平均耗时 64.12 秒。A 部分和 B 部分在两个版本之间具有几乎相同的平均运行时间。D 部分的改进很大,但 C 部分的问题很大。这在我们的生产集群(314 个数据节点)上根本无法很好地扩展。

C部分的一些细节:

  • 我们使用从 Section A 创建的缓存 DF
  • 来自 A 部分的 DF 在一个较小的表(约 1000 行)上连接了四次,它们被联合在一起,基本上像:

    SELECT * FROM t1 JOIN t2 WHERE t1.a = t2.x AND t2.z = 'a' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.b = t2.x AND t2.z = 'b' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.c = t2.x1 AND t1.d = t2.x2 AND t2.z = 'c' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.e = t2.y1 AND t1.f = t2.y2 AND t2.z = 'd'

  • 使用稍微不同的参数再次执行此查询。

  • 每个查询的结果都会被缓存,因为它们被过滤了几次,然后被写入 parquet。

有什么可以解释火花 1.6 和 2.2 之间的差异吗?

我是火花调整的新手,所以如果我能提供更多信息,请告诉我。