1

我正在尝试创建一个数据工厂管道,其中一个活动将文件名称(来自容器或其他文件夹)一一注入数据块活动中,以按传入顺序进行处理。我如何实现它?

4

2 回答 2

0

您好 Surbhi Tayal,感谢您的询问。按传入顺序,我假设您的意思是第一个发送到数据块的应该是第一个完成的,而不是并行处理。

为此,您将需要以下内容:

  • 数组类型的管道变量。
  • 一种用文件名填充数组变量的机制。它可能是 GetMetadata 活动或管道参数或其他东西。如果您需要这方面的帮助,请添加更多详细信息。
  • Databricks 资源和链接服务

在您的管道中创建一个 ForEach 活动。在设置中标记“顺序”选项。如果不这样做,将导致您的活动被并行发送,而不是一个接一个地发送。在设置中,引用“项目”中的数组变量。表情看起来像@variables('myVariableName')。在 ForEach 活动的活动中,放置一个 Databricks 类型的活动。选项是“笔记本”、“Jar”和“Python”。对于我来说,我用的是笔记本。由于 UI 的“浏览”选项,Notebook 更易于设置。将活动设置为首先使用适当的链接服务。设置“Python 文件”/“笔记本路径”/“主类名”。展开“参数”部分并​​添加一个新参数。为参数提供与 Databricks 脚本中相同的名称。@string(item())(如果您的可枚举不是简单的原语数组,可能会有所不同)。这会从 ForEach 活动中获取项目并确保它是一个字符串。如有必要,设置库。

当您尝试运行/调试时,请注意 Databricks 可能需要很长时间才能启动集群。这增加了管道运行时间。

于 2019-06-18T02:38:22.797 回答
0

根据源类型和文件被摄取的频率,它也可能是使用 spark 结构化流的一个选项。对于流数据源,还支持文件作为源 - 将写入目录中的文件作为数据流读取。支持的文件格式是文本、csv、json、orc、parquet(请参阅 DataStreamReader 接口的文档以获取最新列表以及每种文件格式支持的选项)。请注意,文件必须以原子方式放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现。

streamingInputDF = (
  spark
    .readStream           # Similar to Batch just using `readStream` instead of `read`
    .schema(jsonSchema)               
    .json(inputPath)
)

如果您不想永久运行笔记本,请使用 trigger once 选项。使用 trigger once 选项输出为可用数据写入一次,没有此选项输出流将永久运行:

streamingOutputDF \
    .coalesce(1) \
    .writeStream \
    .format("parquet") \
    .partitionBy('ingest_date') \
    .option("checkpointLocation", checkPointPath) \
    .option("path", targetPath) \
    .trigger(once=True) \
    .start()

在这种情况下,您可以使用数据工厂来触发不带参数的 Databricks 笔记本。

于 2019-06-18T05:28:34.263 回答