5

我正在使用 Zeppelin 0.5.5。我在此处为 python 找到了此代码/示例,因为我无法自己使用 %pyspark http://www.makedatauseful.com/python-spark-sql-zeppelin-tutorial/。我感觉他的 %pyspark 示例有效,因为如果您使用原始的 %spark zeppelin 教程,则“银行”表已经创建。

此代码在笔记本中。

%pyspark
from os import getcwd
# sqlContext = SQLContext(sc) # Removed with latest version I tested
zeppelinHome = getcwd()
bankText = sc.textFile(zeppelinHome+"/data/bank-full.csv")

bankSchema = StructType([StructField("age", IntegerType(),     False),StructField("job", StringType(), False),StructField("marital", StringType(), False),StructField("education", StringType(), False),StructField("balance", IntegerType(), False)])

bank = bankText.map(lambda s: s.split(";")).filter(lambda s: s[0] != "\"age\"").map(lambda s:(int(s[0]), str(s[1]).replace("\"", ""), str(s[2]).replace("\"", ""), str(s[3]).replace("\"", ""), int(s[5]) ))

bankdf = sqlContext.createDataFrame(bank,bankSchema)
bankdf.registerAsTable("bank")

此代码在同一个笔记本中,但在不同的工作台上。

%sql 
SELECT count(1) FROM bank

org.apache.spark.sql.AnalysisException: no such table bank; line 1 pos 21
...
4

2 回答 2

8

我发现了这个问题的问题。在 0.6.0 之前,sqlContext 变量是 %pyspark 中的 sqlc。

缺陷可以在这里找到:https ://issues.apache.org/jira/browse/ZEPPELIN-134

在 Pyspark 中,SQLContext 当前在变量名 sqlc 中可用。这与文档和 scala 中的变量名 sqlContext 不一致。

sqlContext 可以用作 SQLContext 的变量,除了 sqlc(为了向后兼容)

相关代码: https ://github.com/apache/incubator-zeppelin/blob/master/spark/src/main/resources/python/zeppelin_pyspark.py#L66

建议的解决方法只是在 %pyspark 脚本中执行以下操作

sqlContext = sqlc

在这里找到:

https://mail-archives.apache.org/mod_mbox/incubator-zeppelin-users/201506.mbox/%3CCALf24sazkTxVd3EpLKTWo7yfE4NvW032j346N+6AuB7KKZS_AQ@mail.gmail.com%3E

于 2015-11-24T17:51:36.927 回答
1

代替 sqlContext,使用 sqlc 并将 registerAsTable 替换为 sqlc.registerDataFrameAsTable

%pyspark
from os import getcwd
zeppelinHome = getcwd()
bankText = sc.textFile(zeppelinHome+"/data/bank-full.csv")

bankSchema = StructType([StructField("age", IntegerType(), False),StructField("job", StringType(), False),StructField("marital", StringType(), False),StructField("education", StringType(), False),StructField("balance", IntegerType(), False)])

bank = bankText.map(lambda s: s.split(";")).filter(lambda s: s[0] != "\"age\"").map(lambda s:(int(s[0]), str(s[1]).replace("\"", ""), str(s[2]).replace("\"", ""), str(s[3]).replace("\"", ""), int(s[5]) ))

bankdf = sqlc.createDataFrame(bank,bankSchema)
sqlc.registerDataFrameAsTable(bankdf, "bank")


%sql 
SELECT count(1) FROM bank
于 2015-11-28T00:04:26.110 回答