在 Azure Databricks Structured Streaming(scala notebook,连接到 Azure IoT Hub)中,我在 Azure IoT Hub 的 Event Hub 兼容端点上打开一个流。然后,我根据结构化模式解析传入流,并在同一流上创建 3 个查询 (groupBy)。大多数时候(似乎并非总是如此)我在分区上一个纪元值周围的一个显示查询中遇到异常。(见下文)我正在使用一个没有其他应用程序正在读取的专用消费者组。那么,我猜想会支持打开 1 个流并对其进行多个流式查询吗?
任何建议,任何解释或想法来解决这个问题?(我想避免必须创建 3 个消费者组并再次定义流 3 次)
异常示例:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1064.0 中的任务 3 失败 4 次,最近一次失败:阶段 1064.0 中丢失任务 3.3(TID 24790、10.139.64.10、执行程序 7):java.util。 concurrent.CompletionException:com.microsoft.azure.eventhubs.ReceiverDisconnectedException:创建具有更高纪元“0”的新接收器,因此当前接收器与纪元“0”断开连接。如果您正在重新创建接收器,请确保使用更高的 epoch。TrackingId:xxxx, SystemTracker:iothub-name|databricks-db, Timestamp:2019-02-18T15:25:19, errorContext[NS: yyy, PATH: savanh-traffic-camera2/ConsumerGroups/databricks-db/Partitions/3, REFERENCE_ID:a0e445_7319_G2_1550503505013,PREFETCH_COUNT:500,LINK_CREDIT:500,PREFETCH_Q_LEN:0]
这是我的代码:(清理)
// Define schema and create incoming camera eventstream
val cameraEventSchema = new StructType()
.add("TrajectId", StringType)
.add("EventTime", StringType)
.add("Country", StringType)
.add("Make", StringType)
val iotHubParameters =
EventHubsConf(cameraHubConnectionString)
.setConsumerGroup("databricks-db")
.setStartingPosition(EventPosition.fromEndOfStream)
val incomingStream = spark.readStream.format("eventhubs").options(iotHubParameters.toMap).load()
// Define parsing query selecting the required properties from the incoming telemetry data
val cameraMessages =
incomingStream
.withColumn("Offset", $"offset".cast(LongType))
.withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $"enqueuedTime".cast(LongType))
.withColumn("Body", $"body".cast(StringType))
// Select the event hub fields so we can work with them
.select("Offset", "Time (readable)", "Timestamp", "Body")
// Parse the "Body" column as a JSON Schema which we defined above
.select(from_json($"Body", cameraEventSchema) as "cameraevents")
// Now select the values from our JSON Structure and cast them manually to avoid problems
.select(
$"cameraevents.TrajectId".cast("string").alias("TrajectId"),
$"cameraevents.EventTime".cast("timestamp").alias("EventTime"),
$"cameraevents.Country".cast("string").alias("Country"),
$"cameraevents.Make".cast("string").alias("Make")
)
.withWatermark("EventTime", "10 seconds")
val groupedDataFrame =
cameraMessages
.groupBy(window($"EventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
display(groupedDataFrame)
val makeDataFrame =
cameraMessages
.groupBy("Make")
.agg(count("*") as 'count)
.sort($"count".desc)
display(makeDataFrame)
val countryDataFrame =
cameraMessages
.groupBy("Country")
.agg(count("*") as 'count)
.sort($"count".desc)
display(countryDataFrame)