我正在使用 CSV 数据集作为输入,读取方式readStream
如下:
inDF = spark \
.readStream \
.option("sep", ",") \
.option("maxFilesPerTrigger", 1) \
.schema(rawStreamSchema) \
.csv(rawEventsDir)
架构下方:
inDF schema =
root
|-- timeStamp: timestamp (nullable = true)
|-- machine: string (nullable = true)
|-- module: string (nullable = true)
|-- component: string (nullable = true)
|-- plateID: integer (nullable = true)
|-- measureProgr: integer (nullable = true)
|-- measure: string (nullable = true)
|-- value: double (nullable = true)
我需要进行一些聚合,如下所示:
byMeasureDF = inDF \
.withWatermark('timeStamp', '600 seconds') \
.groupBy(window(inDF.timeStamp, windowSize, windowStart)
, inDF.machine, inDF.module
, inDF.component, inDF.measure) \
.agg(min(inDF.value).alias('minValue')
, max(inDF.value).alias('maxValue')
, avg(inDF.value).alias('avgValue')
, stddev(inDF.value).alias('stdevValue'))
那行得通,确实输出模式是正确的:
byMeasureDF schema =
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- machine: string (nullable = true)
|-- module: string (nullable = true)
|-- component: string (nullable = true)
|-- measure: string (nullable = true)
|-- minValue: double (nullable = true)
|-- maxValue: double (nullable = true)
|-- avgValue: double (nullable = true)
|-- stdevValue: double (nullable = true)
但是,当我运行以下查询时:
q_byMeasure = byMeasureDF \
.writeStream \
.format('csv') \
.option('delimiter', ',') \
.option('header', 'true') \
.outputMode('append') \
.queryName('byMeasure') \
.start(path = confStreamMiningByMeasureDir
, checkpointLocation = chkStreamMiningByMeasureDir)
我收到以下错误:
Traceback (most recent call last):
File "/home/roberto/BOTT-G80216/Programs/Version_00.01.00/Python/2_fromRawToConformed.py", line 87, in <module>
, checkpointLocation = chkStreamMiningByMeasureDir)
File "/home/roberto/spark/python/pyspark/sql/streaming.py", line 844, in start
return self._sq(self._jwrite.start(path))
File "/home/roberto/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/roberto/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
请注意,如果我在控制台上编写相同的数据帧,它运行良好。
看起来这个 Spark 版本中是否存在错误。有谁知道可能的解决方案。非常感谢提前