1

使用此数据集:

start,end,rms,state,maxTemp,minTemp
2019-02-20T16:16:31.752Z,2019-02-20T17:33:34.750Z,4.588481,charge,35.0,32.0
2019-02-20T17:33:34.935Z,2019-02-20T18:34:49.737Z,5.770562,discharge,35.0,33.0

和这个:

[{"EventDate":"2019-02-02T16:17:00.579Z","Value":"23"},
{"EventDate":"2019-02-02T16:18:01.579Z","Value":"23"},
{"EventDate":"2019-02-02T16:19:02.581Z","Value":"23"},
{"EventDate":"2019-02-02T16:20:03.679Z","Value":"23"},
{"EventDate":"2019-02-02T16:21:04.684Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:05.693Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:06.694Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:07.698Z","Value":"23"},
{"EventDate":"2019-02-02T17:40:08.835Z","Value":"23"}]

schema = StructType([
    StructField('EventDate', TimestampType(), True),
    StructField('Value', FloatType(), True)
])

我想将 json 数据集的最大值和最小值作为列添加到 csv 数据集中。

我努力了:

cyclesWithValues = csvDf\
.withColumn("max", jsondata.filter((col("EventDate") >= csvDf.start) & (col("EventDate") <= csvDf.end)).agg({"value": "max"}).head()["max(Value)"])\
.withColumn("min", jsondata.filter((col("EventDate") >= csvDf.start) & (col("EventDate") <= csvDf.end)).agg({"value": "min"}).head()["min(Value)"])

但我得到这个错误:

AnalysisException: 'Resolved attribute(s) start#38271,end#38272 missing from EventDate#38283,Value#38286 in operator !Filter ((EventDate#38283 >= start#38271) && (EventDate#38283 <= end#38272)).;;\n!Filter ((EventDate#38283 >= start#38271) && (EventDate#38283 <= end#38272))\n+- Project [EventDate#38283, cast(Value#38280 as float) AS Value#38286]\n   +- Project [to_timestamp(EventDate#38279, None) AS EventDate#38283, Value#38280]\n      +- Relation[EventDate#38279,Value#38280] json\n'

我有一个基于数组的解决方案,但它似乎很慢,所以我希望这样的东西可以加快速度。

现在我正在使用这个解决方案:

dfTemperature = spark.read.option("multiline", "true").json("path")
dfTemperatureCast = dfTemperature.withColumn("EventDate", to_timestamp(dfTemperature.EventDate)).withColumn("Value", dfTemperature.Value.cast('float'))

def AddVAluesToDf(row):
  temperatures = dfTemperatureCast.filter((col("EventDate") >= row["start"]) & (col("EventDate") <= row["end"]))
  maxTemp = temperatures.agg({"value": "max"}).head()["max(value)"]
  minTemp = temperatures.agg({"value": "min"}).head()["min(value)"]
  return (row.start, row.end, row.rms, row.state, maxTemp, minTemp)

pool = ThreadPool(10)
withValues = pool.map(AddVAluesToDf, rmsDf)

schema = StructType([
    StructField('start', TimestampType(), True),
    StructField('end', TimestampType(), True),
    StructField('maxTemp', FloatType(), True),
    StructField('minTemp', FloatType(), True)
])

cyclesDF = spark.createDataFrame(withValues, schema)
4

0 回答 0