3

我只是使用 querycassandra 处理器查询 cassandra 表,但我不明白如何将我的 Json 输出文件作为输入文件传递到 ExecutePyspark 处理器,稍后我需要将我的 Spark 输出数据传递给 Hive。请帮我解决这个问题,谢谢。

我的查询 Cassandra 属性:

在此处输入图像描述

Pyspark 属性: 在此处输入图像描述

4

1 回答 1

5

考虑使用 4 个处理器的流程,如下所示:

QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark

第 1 步QueryCassandra处理器:在 Cassandra 上执行 CQL,并将结果输出到流文件中。

第 2 步UpdateAttribute处理器:为属性分配filename一个值,该值包含磁盘上将包含查询结果的临时文件的名称。使用NiFi 表达式语言生成文件名,以便每次运行都不同。创建一个属性result_directory并为 NiFi 具有写入权限的磁盘上的文件夹分配一个值。

  • 财产:filename
  • 价值:cassandra_result_${now():toNumber()}

  • 财产:result_directory

  • 价值:/tmp

在此处输入图像描述

第 3 步PutFile处理器:使用在第 2 步中填充Directory的值配置属性。${result_directory}

在此处输入图像描述

第 4 步ExecutePySpark处理器:通过PySpark App Args处理器属性将文件名及其位置作为参数传递给 PySpark 应用程序。然后,应用程序可以使用代码从磁盘上的文件中读取数据,对其进行处理并写入 Hive。

  • 财产:PySpark App Args
  • 价值:${result_directory}/${filename}

在此处输入图像描述

此外,您可以在步骤 2 (UpdateAttribute) 中配置更多属性,然后可以在步骤 4 (ExecutePySpark) 中将这些属性作为参数传递,并由 PySpark 应用程序在写入 Hive 时考虑(例如,Hive 数据库和表名)。

于 2018-03-14T21:32:23.193 回答