我正在尝试使用 Spark Structured Streaming 处理一些事件。
传入事件如下所示:
事件一:
| 网址 |
|---|
| http://first/path/to/read/from... |
活动二:
| 网址 |
|---|
| http://second/path/to/read/from... |
等等。
我的目标是阅读每个网址并生成一个新的 DF。到目前为止,我已经用这样的代码完成了它,我做了一个collect().
def createDF(url):
file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url
""" Read data """
binary = spark.read.format("binaryFile").load(file_url)
""" Do other operations """
...
""" save the data """
# write it into blob again
return something
def loadData(batchDf, batchId):
"""
batchDf:
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
| body|partition| offset|sequenceNumber| enqueuedTime|publisher|partitionKey| properties|systemProperties| url|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
|[{"topic":"/subsc...| 0|30084343744| 55489|2021-03-03 14:21:...| null| null|[aeg-event-type -...| []|http://path...|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
"""
""" Before ....
df = batchDf.select("url")
url = df.collect()
[createDF(item) for item in url]
"""
# Now without collect()
# Select the url field of the df
url_select_df = batchDf.select("url")
# Read url value
result = url_select_df.rdd.map(lambda x: createDF(x.url))
query = df \
.writeStream \
.foreachBatch(loadData) \
.outputMode("update") \
.queryName("test") \
.start() \
.awaitTermination()
但是,当我想在不收集的情况下提取 URL 时,我收到以下错误消息:
您似乎正在尝试从广播中引用 SparkContext。
会发生什么?
非常感谢您的帮助