1

我正在查询 CosmosDb 集合,并且可以打印结果。当我尝试将结果存储到 Spark DataFrame 时,它​​失败了。

以本站为例:

如何在 python 中从 Azure 的 CosmosDB 读取数据

遵循上面链接中的确切步骤。此外,尝试以下

 df = spark.createDataFrame(dataset)

这会引发此错误:

ValueError:某些类型在推断后无法确定

ValueError Traceback (last last call last)
in ()
25 print (dataset)
26
---> 27 df = spark.createDataFrame(dataset)
28 df.show()
29

/databricks/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
808 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
809 else:
--> 810 rdd, schema = self._createFromLocal(map(prepare, data), schema)
811 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
812 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd (), schema.json())

/databricks/spark/python/pyspark/sql/session.py 在 _createFromLocal(self, data, schema)
440 中写入临时文件。
441 """
--> 442 数据,模式 = self._wrap_data_schema(数据,模式)
443 返回 self._sc.parallelize(数据),模式

但是,希望将其保存为 Spark DataFrame

任何帮助将非常感激。谢谢!!!>

4

2 回答 2

0

为了推断字段类型,PySpark 查看每个字段中的非无记录。如果一个字段只有 None 记录,PySpark 无法推断类型并会引发该错误。

手动定义架构将解决问题

>>> from pyspark.sql.types import StructType, StructField, StringType
>>> schema = StructType([StructField("foo", StringType(), True)])
>>> df = spark.createDataFrame([[None]], schema=schema)
>>> df.show()
+----+
|foo |
+----+
|null|
+----+

希望能帮助到你。

于 2019-05-03T11:19:09.940 回答
0

我看到您使用旧的 Python SDK for DocumentDB 来查询 CosmosDB 文档以创建 PySpark DataFrame 对象,从而遵循我之前的回答。但是你不能直接将方法的结果docs作为client.ReadDocuments参数data传递给函数SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True),因为数据类型不同,如下所示。

该函数createDataFrame需要一个参数,该参数data必须是RDDlistpandas.DataFrame

在此处输入图像描述

pydocumentdb-2.3.3.tar.gz但是,我从https://pypi.org/project/pydocumentdb/#files下载了源代码并查看了代码文件document_client.py& query_iterable.py

# from document_client.py
def ReadDocuments(self, collection_link, feed_options=None):
    """Reads all documents in a collection.

    :param str collection_link:
        The link to the document collection.
    :param dict feed_options:

    :return:
        Query Iterable of Documents.
    :rtype:
        query_iterable.QueryIterable

    """
    if feed_options is None:
        feed_options = {}

    return self.QueryDocuments(collection_link, None, feed_options)

# query_iterable.py
class QueryIterable(object):
    """Represents an iterable object of the query results.
    QueryIterable is a wrapper for query execution context.
    """

所以要解决你的问题,你必须pandas.DataFrame首先通过迭代方法的结果来创建一个对象Query Iterable of DocumentsReadDocuments然后通过创建一个 PySpark DataFrame 对象spark.createDataFrame(pandas_df)

于 2019-05-09T08:07:06.620 回答