我正在处理包含地理坐标点的 json 对象。我想针对我在本地拥有的 postgis 服务器运行这些点,以评估多边形匹配中的点。
我希望用预先存在的处理器来做到这一点——我成功地使用“EvaluateJsonPath”处理器将纬度/经度坐标提取到属性中,并使用“ExecuteSQL”成功地向我的本地 postgis 数据存储发出查询。这给我留下了 avro 响应,然后我可以使用“ConvertAvroToJSON”处理器将其转换为 JSON。
我在如何将查询结果与原始 JSON 对象合并在一起时遇到概念问题。事实上,我有两个具有相同片段 ID 的流文件,理论上我可以将它们与“mergecontent”合并在一起,但这让我:
{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}
是否有任何建议的策略将 SQL 查询的结果合并到原始 json 结构中,所以我的结果将是这样的:
{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}
我正在运行 nifi 6.0、postgres 9.5.2 和 postgis 2.2.1。
我在https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.html中看到了一些关于使用 replaceText 处理器的参考- 但这似乎是将属性中的内容合并到正文中内容。我错过了将原始内容与 SQL 响应的内容或从没有内容的 SQL 响应中提取的属性合并的要点。
编辑:
下面的 Groovy 脚本似乎可以满足需要。我不是一个 groovy 编码器,所以欢迎任何改进。
import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper
def flowFile = session.get();
if (flowFile == null) {
return;
}
def slurper = new JsonSlurper()
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def obj = slurper.parseText(text)
def originaljsontext = flowFile.getAttribute('original.json')
def originaljson = slurper.parseText(originaljsontext)
originaljson.put("point_polygon_info", obj)
outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)