0

我创建了一个带有 Spark 选项集的 Cloudera 5.x 集群:

在此处输入图像描述

我想使用 PySpark 运行一个简单的测试,从一个 Datatap 读取数据并将其写入另一个 Datatap。

使用 PySpark 执行此操作的步骤是什么?

4

1 回答 1

0

对于此示例,我将使用默认为我的租户创建的 TenantStorage DTAP。

我已经从https://raw.githubusercontent.com/fivethirtyeight/data/master/airline-safety/airline-safety.csv上传了一个数据集

接下来,找到控制器节点并 ssh 进入它:

在此处输入图像描述

因为租户设置了默认的集群超级用户权限站点管理员和租户管理员),所以我可以从集群页面下载租户 ssh 密钥并使用它来 ssh 到控制器节点:

ssh bluedata@x.x.x.x -p 10007 -i ~/Downloads/BD_Demo\ Tenant.pem

x.x.x.x对我来说是我的 BlueData 网关的公共 IP 地址。

请注意,我们正在连接到端口 10007,这是控制器的端口。

运行 pyspark:

$ pyspark --master yarn --deploy-mode client --packages com.databricks:spark-csv_2.10:1.4.0

访问数据文件并检索第一条记录:

>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dtap://TenantStorage/airline-safety.csv')
>>> df.take(1)

结果是:

[行(航空公司=u'Aer Lingus',avail_seat_km_per_week=320906734,events_85_99=2,fatal_accidents_85_99=0,fatalities_85_99=0,events_00_14=0,fatal_accidents_00_14=0,fatalities_00_14=0)]

如果您想从一个 Datatap 读取数据,对其进行处理并将其保存到另一个 Datatap,它看起来像这样:

>>> df_filtered = df.filter(df.incidents_85_99 == 0)
>>> df_filtered.write.parquet('dtap://OtherDataTap/airline-safety_zero_incidents.parquet')
于 2019-07-12T21:00:04.520 回答