问题标签 [apache-spark-1.6]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
pyspark - pyspark 内存问题:引起:java.lang.OutOfMemoryError:Java 堆空间
伙计们,
我正在运行 pyspark 代码以从 hdfs 读取 500mb 文件并从文件内容构造一个 numpy 矩阵
集群信息:
9 个数据节点 128 GB 内存 /48 vCore CPU /节点
作业配置
错误
收集件正在吐出内存不足错误。
任何帮助深表感谢。
scala - 如何替换向量列中的空值?
我有一列 [vector] 类型的列,其中包含无法删除的空值,这是一个示例
我想做的是:
但这会引发错误:
我尝试过的其他事情:
我正在努力寻找适用于 Spark 1.6 的解决方案
scala - 如何仅考虑列的子集删除重复项?
我使用 Spark 1.6 并在两个数据帧上进行内部连接,如下所示:
aggrgn_filter_group_id
但我不断在列中得到重复的值。你能建议一些解决方案吗?
scala - Spark Streaming 1.6 + Kafka:处于“排队”状态的批次过多
我正在使用火花流来使用来自 Kafka 主题的消息,该主题有 10 个分区。我正在使用直接方法从 kafka 消费,代码如下:
从代码中可以看出,我使用window来实现这个(如果我错了,请纠正我):由于有一个插入hive表的动作,我想避免过于频繁地写入HDFS,所以什么我想要的是在内存中保存足够的数据,然后才写入文件系统。我认为使用 window 将是实现它的正确方法。
现在,在下图中,您可以看到有许多批次正在排队,并且正在处理的批次需要很长时间才能完成。
我还提供了正在处理的单个批次的详细信息:
当批处理中没有很多事件时,为什么插入操作有这么多任务?有时,事件为 0 也会产生数千个需要永远完成的任务。
我用 Spark 处理微批处理的方式错了吗?
谢谢你的帮助!
一些额外的细节:
纱线容器最大为 2gb。在这个 Yarn 队列中,容器的最大数量是 10。当我查看正在执行这个 spark 应用程序的队列的详细信息时,容器的数量非常大,大约 15k 待处理的容器。
apache-spark-sql - Spark 1.6 - 使用数据帧失败的 avro 文件覆盖目录
我在 HDFS 中有一个目录,其中包含 avro 文件。当我尝试用数据框覆盖目录时,它失败了。语法:avroData_df.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("") 错误是: Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path不存在:hdfs://nameservice1//part-r-00000-bca9a5b6-5e12-45c1-a877-b0f6d6cc8cd3.avro
在覆盖时,它似乎也在考虑 avro 文件。
我们可以使用 Spark 1.6 来做到这一点吗?
apache-spark - 为什么persist(StorageLevel.MEMORY_AND_DISK) 给出的结果与使用HBase 的cache() 不同?
问这个问题听起来很天真,但这是我最近在项目中遇到的一个问题。需要对它有更好的理解。
每当我们在 HBase 读取中使用此类持久化时 - 相同的数据会一次又一次地返回到流作业的其他后续批次,但 HBase 会在每次批次运行时更新。
HBase 读取代码:
我替换persist(StorageLevel.MEMORY_AND_DISK)
为cache()
它,它按预期从 HBase 表返回更新的记录。
我们尝试使用的原因persist(StorageLevel.MEMORY_AND_DISK)
是确保内存中的存储不会被填满,并且我们不会在执行特定流期间重新进行所有转换。
Spark 版本- 1.6.3 HBase 版本- 1.1.2.2.6.4.42-1
有人可以向我解释一下并帮助我更好地理解吗?
apache-spark - 如何使用 pyspark 在字段中读取带逗号的 CSV 文件?
我有一个在列值中包含逗号的 csv 文件。例如,
当数据中有额外的逗号时,这些值用双引号括起来。在上面的示例中,值为 Column1=123, Column2=45,6 和 Column3=789 但是,当尝试读取数据时,由于 Column2 字段中的额外逗号,它给了我 4 个值。
在PySpark中读取这些数据时如何获得正确的值?我正在使用Spark 1.6.3
我目前正在执行以下操作以创建一个 rdd,然后从 rdd 创建一个数据框。
csv - 如何根据pyspark中的日期字段将输入数据拆分为多个文件?
我有一个带有日期字段的配置单元表。
我正在阅读下表。
我想根据 date_field 列将输入数据拆分为多个文件,并将其放入 date_field 文件夹中。输出应如下所示。
例如:文件 (/data/2018-10-01/datafile.csv ) 应具有以下数据。
我应该采取什么方法来实现这一目标?
java - cast方法导致java spark中的空值
我有一个在两个数据帧上执行连接的简单用例,我使用的是 spark 1.6.3 版本。问题是,在尝试使用 cast 方法将字符串类型转换为整数类型时,结果列都是空值。
我已经尝试过这里提到的所有解决方案How to cast a column in dataframe? 但是所有问题都有scala api的答案,我找不到任何使用java api的人。
我无法找到它的解决方案,而且我尝试转换的列是字符串类型并且可能包含尾随空格,这可能是一个问题吗?如果是,那么我该如何删除它们,我尝试如下删除它们,但似乎不起作用。这是我第一次使用 spark 数据框,因此非常感谢任何帮助。谢谢!
scala - Spark 2.2 中的 Spark 程序比 Spark 1.6 慢两倍
我们正在将 Scala Spark 程序从 1.6.3 迁移到 2.2.0。有问题的程序有四个部分:我们称它们为 A、B、C 和 D 部分。A 部分解析输入(parquet 文件),然后缓存 DF 并创建一个表。然后B、C、D段依次对DF进行不同类型的处理。
这个 spark 作业每小时运行大约 50 次,输入文件的数量取决于当前可用的文件。executors、cores、executor memory 和 agg partitions 的数量是根据输入文件的数量和它们的大小来计算的。这些是这样计算的:
- n_execs = min(n_infiles, num_datanodes)
- n_cores = ceil(n_infiles/n_execs)
- exec_mem = 1500 * n_cores (MB)
- shuffle_partitions = n_infiles
- driver_mem = ceil(n_infiles / 200) (GB)
- 最大分区字节数 = 5*1024*1024(常数)
在我们的单元测试环境中,我们目前有 12 个数据节点,平均文件输入大小为 11MB / 630K 记录。所以一些例子是:
- n_infiles (24): driver_mem (1G), n_execs (12), n_cores (2), exec_mem (3000M), shuffle_partitions (24)
- n_infiles (4): driver_mem (1G), n_execs: (4), n_cores (1), exec_mem (1500M), shuffle_partitions (4)
现在的问题是:使用 Spark 2.2 分析 1095 次运行,我们看到 C 部分平均耗时 41.2 秒,而 D 部分平均耗时 47.73 秒。在 Spark 1.6.3 中,超过 425 次运行的 C 部分平均耗时 23 秒,而 D 部分平均耗时 64.12 秒。A 部分和 B 部分在两个版本之间具有几乎相同的平均运行时间。D 部分的改进很大,但 C 部分的问题很大。这在我们的生产集群(314 个数据节点)上根本无法很好地扩展。
C部分的一些细节:
- 我们使用从 Section A 创建的缓存 DF
来自 A 部分的 DF 在一个较小的表(约 1000 行)上连接了四次,它们被联合在一起,基本上像:
SELECT * FROM t1 JOIN t2 WHERE t1.a = t2.x AND t2.z = 'a' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.b = t2.x AND t2.z = 'b' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.c = t2.x1 AND t1.d = t2.x2 AND t2.z = 'c' UNION ALL SELECT * FROM t1 JOIN t2 WHERE t1.e = t2.y1 AND t1.f = t2.y2 AND t2.z = 'd'
使用稍微不同的参数再次执行此查询。
每个查询的结果都会被缓存,因为它们被过滤了几次,然后被写入 parquet。
有什么可以解释火花 1.6 和 2.2 之间的差异吗?
我是火花调整的新手,所以如果我能提供更多信息,请告诉我。