1

我正在尝试使用 Spark Structured Streaming 处理一些事件。

传入事件如下所示:

事件一:

网址
http://first/path/to/read/from...

活动二:

网址
http://second/path/to/read/from...

等等。

我的目标是阅读每个网址并生成一个新的 DF。到目前为止,我已经用这样的代码完成了它,我做了一个collect().


def createDF(url):

    file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url

    """ Read data """
    binary = spark.read.format("binaryFile").load(file_url)
    """ Do other operations """
    ...

    """ save the data """
    # write it into blob again

    return something

def loadData(batchDf, batchId):

    
    """
    batchDf:
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
        |                body|partition|     offset|sequenceNumber|        enqueuedTime|publisher|partitionKey|          properties|systemProperties|                 url|
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
        |[{"topic":"/subsc...|        0|30084343744|         55489|2021-03-03 14:21:...|     null|        null|[aeg-event-type -...|              []|http://path...|
        +--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
    """

    """ Before .... 

    df = batchDf.select("url")
    url = df.collect()

    [createDF(item) for item in url]
    """
    # Now without collect()
    # Select the url field of the df
    url_select_df = batchDf.select("url")

    # Read url value
    result = url_select_df.rdd.map(lambda x: createDF(x.url))
  
query  = df \
    .writeStream \
    .foreachBatch(loadData) \
    .outputMode("update") \
    .queryName("test") \
    .start() \
    .awaitTermination()

但是,当我想在不收集的情况下提取 URL 时,我收到以下错误消息:

您似乎正在尝试从广播中引用 SparkContext

会发生什么?

非常感谢您的帮助

4

1 回答 1

1

没有collectDataframe的调用url_select_df分布在执行者之间。然后当您调用map时,lambda 表达式将在执行程序上执行。因为正在调用使用 SparkContext 的 lambda 表达式createDF,所以您会收到异常,因为无法在执行程序上使用 SparkContext。

看起来您已经找到了collect将数据帧应用于驱动程序的解决方案,并在那里应用了 lambda 表达式。

只要确保您没有超载(基于内存)您的驱动程序。

于 2021-03-03T15:07:51.987 回答