1

我正在使用 CSV 数据集作为输入,读取方式readStream如下:

inDF = spark \
    .readStream \
    .option("sep", ",") \
    .option("maxFilesPerTrigger", 1) \
    .schema(rawStreamSchema) \
    .csv(rawEventsDir)

架构下方:

inDF schema = 
root
 |-- timeStamp: timestamp (nullable = true)
 |-- machine: string (nullable = true)
 |-- module: string (nullable = true)
 |-- component: string (nullable = true)
 |-- plateID: integer (nullable = true)
 |-- measureProgr: integer (nullable = true)
 |-- measure: string (nullable = true)
 |-- value: double (nullable = true)

我需要进行一些聚合,如下所示:

byMeasureDF = inDF \
        .withWatermark('timeStamp', '600 seconds') \
        .groupBy(window(inDF.timeStamp, windowSize, windowStart)
                 , inDF.machine, inDF.module
                 , inDF.component, inDF.measure) \
        .agg(min(inDF.value).alias('minValue')
             , max(inDF.value).alias('maxValue')
             , avg(inDF.value).alias('avgValue')
             , stddev(inDF.value).alias('stdevValue'))

那行得通,确实输出模式是正确的:

byMeasureDF schema = 
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- machine: string (nullable = true)
 |-- module: string (nullable = true)
 |-- component: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- minValue: double (nullable = true)
 |-- maxValue: double (nullable = true)
 |-- avgValue: double (nullable = true)
 |-- stdevValue: double (nullable = true)

但是,当我运行以下查询时:

q_byMeasure = byMeasureDF \
          .writeStream \
          .format('csv') \
          .option('delimiter', ',') \
          .option('header', 'true') \
          .outputMode('append') \
          .queryName('byMeasure') \
          .start(path = confStreamMiningByMeasureDir
                 , checkpointLocation = chkStreamMiningByMeasureDir)

我收到以下错误:

Traceback (most recent call last):
  File "/home/roberto/BOTT-G80216/Programs/Version_00.01.00/Python/2_fromRawToConformed.py", line 87, in <module>
    , checkpointLocation = chkStreamMiningByMeasureDir)
  File "/home/roberto/spark/python/pyspark/sql/streaming.py", line 844, in start
    return self._sq(self._jwrite.start(path))
  File "/home/roberto/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/roberto/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

请注意,如果我在控制台上编写相同的数据帧,它运行良好。

看起来这个 Spark 版本中是否存在错误。有谁知道可能的解决方案。非常感谢提前

4

0 回答 0