2

当我尝试这个时:

cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).getOrCreate()

lines = spark.readStream.load(format='socket', host='localhost', port=9999,
                              schema=StructType(StructField('value', StringType, True)))
words = lines.groupBy('value').count()
query = words.writeStream.format('console').outputMode("complete").start()

query.awaitTermination()

然后我得到一些错误:

AssertionError: dataType 应该是 DataType

我在 ./pyspark/sql/types.py 的第 403 行搜索源代码:

assert isinstance(dataType, DataType), "dataType should be DataType"

但是StringType基于AtomicType而不是DataType

class StringType(AtomicType):
    """String data type.
    """

    __metaclass__ = DataTypeSingleton

那么有错误吗?

4

1 回答 1

3

在 PythonDataTypes中不用作单例。创建StructField时必须使用实例。还StructType需要一个序列StructField

StructType([StructField('value', StringType(), True)])

然而,这在这里完全没有意义。Schema ofTextSocketSource 是固定的,不能用 schema 参数修改。

于 2017-01-06T08:57:15.113 回答