问题标签 [apache-spark-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
13532 浏览

sql - Spark SQL:IF 中的 NULL 处理

我正在尝试在IF左外部连接输出之上执行 [spark's coalesce],但似乎NULL没有按预期处理。这是我的基表、示例查询、输出和预期输出-

基表:

t1:
a,100
b,101
c,102

时间2:
101

询问:

select ax, a.x1, IF(b.x1 is NULL,a.x1,b.x1) from t1 a LEFT OUTER JOIN t2 b on a.x1=b.x1;

输出:

a,100,null
b,101,101
c,102,null

预期的:

a,100,100
b,101,101
c,102,102

我也尝试过包装上面的查询,然后在上面执行一个 IF。但没有成功。请建议我错过了什么。

0 投票
0 回答
1550 浏览

apache-spark - “spark-shell --master yarn-client”时出现异常

当我跑

出现错误如下:

这部分错误很突出:

如何解决此错误?

0 投票
2 回答
2004 浏览

apache-spark - 集成 SQL 和 Spark Streaming 时出现不可序列化异常

除了集成 Spark SQL 和 Spark Streaming 时出现的 Not Serializable 异常

我的源代码

JavaSQLContext 也在 ForeachRDD 循环之外声明,但我仍然收到 NonSerializableException

23 年 14 月 12 日 23:49:38 错误 JobScheduler:运行作业流作业时出错 1419378578000 ms.1 org.apache.spark.SparkException:在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala :166) org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) org.apache.spark.SparkContext.clean(SparkContext.scala:1435) org.apache.spark.rdd.RDD .map(RDD.scala:27​​1) 在 org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) 在 org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD .scala:42) 在 com.basic.spark.NumberCount$2.call(NumberCount.java:79) 在 com.basic.spark.NumberCount$2.call(NumberCount.java:67) 在 org.apache.spark.streaming。 api.java.JavaDStreamLike$$anonfun$foreachRDD$1。在 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:27​​4) 在 org.apache.spark.streaming.dstream.DStream$ 应用(JavaDStreamLike.scala:27​​4) $anonfun$foreachRDD$1.apply(DStream.scala:529) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:529) 在 org.apache.spark.streaming。 dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) 在 org.apache。 spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run (Job.scala:32) 在 org.apache.spark.streaming.scheduler。JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java .lang.Thread.run(Thread.java:724) 引起:java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java.io。 java.io.ObjectOutputStream 中的 ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)。defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java :1175) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java .io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 上的 apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) ... 还有 20 个

如果您有任何建议,我将不胜感激。

0 投票
0 回答
653 浏览

apache-spark - 加入 3 个 SchemaRDD

我正在使用三个 SchemaRDD 进行三向连接(每个记录大约为一百万条记录,存储在 HDFS 上的 Parquet 文件中)。

架构如下:

  • table1 有四个字段:id、group_id、t2_id 和日期
  • table2 有三个字段:id、group_id 和 t3_id
  • table3 有三个字段:id、group_id 和 date

我试图找出组内 table1 和 table3 之间的关系。

我将使用的 SQL 查询是:

但是,我正在尝试在 Spark 中执行此操作:

所以这似乎可行——但它在同一个(3 单元,EMR)集群上的运行速度明显慢于 impala。这是正确的方法吗?有没有办法让它更高效?

谢谢你的帮助

0 投票
2 回答
7058 浏览

java - Spark SQL 性能

我的代码算法如下
Step1。获取一个 hbase 实体数据到 hBaseRDD

步骤 2。将 hBaseRDD 转换为 rowPairRDD

步骤 3。将 rowPairRDD 转换为 schemaRDD

第四步。使用 spark sql 做第一个简单的 sql 查询。

步骤 5。使用 spark sql 做第二个简单的 sql 查询。

第六步。使用 spark sql 做第三个简单的 sql 查询。

测试结果如下:

测试用例1

当我插入 300,000 条记录时,hbase 实体,然后运行代码。

  • 第一次查询需要 60407 毫秒
  • 第二次查询需要 838 毫秒
  • 3td 查询需要 792 毫秒

如果我使用 hbase Api 做类似的查询,只需要 2000 毫秒。显然最后 2 个 spark sql 查询比 hbase api 查询快得多。
我相信第一个 spark sql 查询会花费大量时间从 hbase 加载数据。
所以第一个查询比最后两个查询慢得多。我认为结果是预期的

测试用例2

当我插入 400,000 条记录时。hbase 实体,然后运行代码。

  • 第一次查询需要 87213 毫秒
  • 第二次查询需要 83238 毫秒
  • 3td 查询需要 82092 毫秒

如果我使用 hbase Api 做类似的查询,只需要 3500 毫秒。显然 3 spark sql 查询比 hbase api 查询慢得多。
而且最后2个spark sql查询也很慢,性能和第一个查询差不多,为什么?如何调整性能?

0 投票
1 回答
367 浏览

postgresql - 将 PostgreSQL 数据库加载到 SchemaRDD

我在 PostgreSQL 中有 100 万行和 100 多列的数据源,我想使用 Spark SQL,所以我想将此数据源转换为SchemaRDD.

Spark SQL编程指南中介绍了两种方法,一种是通过反射,也就是说我需要定义:

这很乏味,因为我有 100 多列。

另一种方法是“以编程方式指定架构”,这意味着我需要定义:

这对我来说也很乏味。

实际上,还有另一个问题,因为我PostgreSQL使用类加载我的数据库,JdbcRDD但我发现我还需要在构造函数的mapRow参数中定义模式JdbcRDD,如下所示:

这个 API 仍然要求我自己创建模式,更糟糕的是我需要重做类似的事情来将其转换JdbcRDDSchemaRDD,那将是非常笨拙的代码。

所以我想知道这项任务的最佳方法是什么?

0 投票
0 回答
1171 浏览

scala - Apache SPARK zipWithUniqueID

我需要为我的每个元组生成唯一的 id [相当于 ETL 维度处理中的代理键生成]

我将处理 40 亿行。我有点担心第 3 行的性能,其中获取生成的代理键的最大值..

有没有更好的清洁方法来达到同样的效果?

0 投票
2 回答
5102 浏览

apache-spark - spark RDD union 非常慢

我有 2 个 spark RDD、dataRDD 和 newPairDataRDD,它们用于 spark SQL 查询。当我的应用程序初始化时,dataRDD 将被初始化。一个指定的 hbase 实体中的所有数据都将存储到 dataRDD。

当客户端的 sql 查询到来时,我的 APP 将获得所有新的更新和插入到 newPairDataRDD。dataRDD 联合 newPairDataRDD 并在 spark SQL 上下文中注册为表。

我什至在 dataRDD 中发现了 0 条记录,在 newPairDataRDD 中发现了 1 条新插入记录。联合需要 4 秒。这太慢了

我认为这是不合理的。任何人都知道如何使它更快?感谢下面的简单代码

从 spark web ui,我可以看到下面。显然它需要4s来联合

已完成的阶段 (8)

StageId 描述 提交的持续时间 任务:成功/总输入 Shuffle Read Shuffle Write

6 在 SparkPlan.scala 收集:85+详细信息 1/4/2015 8:17 2 s 8-Aug 156.0 B

SparkSqlQueryForMarsNew.java:389+details 的 7 联合 1/4/2015 8:17 4 s 8-Aug 64.0 B 156.0 B

0 投票
1 回答
1446 浏览

java - SparkSQL - 集合上的 CassandraSqlContext 查询(集)

可以说我有一张像这样的桌子:

和索引:

我想使用 sql 上下文查询书籍。我正在做的是:

以下查询不适用于Cassandra :

它返回只有“book1”的用户。我尝试过类似的查询,books CONTAINS ('book1', 'book2')但没有一个有效。

我可以在注册表上做的是:

我想做的是通过以下书籍查询:

或类似的查询。

但它不起作用。它返回 0 条记录。我试图注册名为的索引表,user_books_idx但它也没有工作。我可以查询索引集合吗?我该怎么做?

0 投票
2 回答
2195 浏览

scala - 注册表时的SparkSQL MissingRequirementError

我是 Scala 和 Apache Spark 的新手,我正在尝试使用 Spark SQL。克隆repobin/spark-shell后,我通过键入并运行以下命令启动了 spark shell :

一切都按预期工作。该users.txt文件类似于:

之后,我尝试创建一个独立项目,并使用sbt. 中列出的依赖项build.sbt如下:

如果我运行相同的指令,它会在这一行崩溃:

出现此错误:

问题是什么?

更新:

好的,我认为问题不是 Spark SQL,而是 Spark 本身,因为我什至无法执行users.collect(). 相反,如果它在 spark shell 中运行,结果是:

正如预期的那样。错误如下:

在以编程方式提交作业时,我还在 Spark EC2 集群上发现了这个java.io.EOFException ,但我不知道hadoop-client可能需要哪个版本。