0

示例输入如下:我需要使用 Apache NiFi 将 JSON 数组中存在的 JSON 对象拆分为单独的 JSON 文件,并将其发布到 Kafka 主题。以下数组中存在多个 JSON 对象

[
{
    "stops": "1 Stop",
    "ticket price": "301.20",
    "days to departure": -1,
    "date of extraction": "03/22/2019",
    "departure": ", Halifax",
    "arrival": ", Toronto",
    "flight duration": "0 days 3 hours 58 minutes",
    "airline": "Porter Airlines",
    "plane": "DE HAVILLAND DHC-8 DASH 8-400 DASH 8Q",
    "timings": [
        {
            "departure_airport": "Halifax, NS, Canada (YHZ-Stanfield Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "6:40pm",
            "arrival_airport": "Ottawa, ON, Canada (YOW-Macdonald-Cartier Intl.)",
            "arrival_time": "7:58pm"
        },
        {
            "departure_airport": "Ottawa, ON, Canada (YOW-Macdonald-Cartier Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "8:30pm",
            "arrival_airport": "Toronto, ON, Canada (YTZ-Billy Bishop Toronto City)",
            "arrival_time": "9:38pm"
        }
    ],
    "plane code": "DH4",
    "id": "8e6c69c8-65e0-4f1b-b540-ae61abf8aa6d"
},
{
    "stops": "Nonstop",
    "ticket price": "390.95",
    "days to departure": -1,
    "date of extraction": "03/22/2019",
    "departure": ", Halifax",
    "arrival": ", Toronto",
    "flight duration": "0 days 2 hours 35 minutes",
    "airline": "Air Canada",
    "plane": "Boeing 767-300",
    "timings": [
        {
            "departure_airport": "Halifax, NS, Canada (YHZ-Stanfield Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "7:40pm",
            "arrival_airport": "Toronto, ON, Canada (YYZ-Pearson Intl.)",
            "arrival_time": "9:15pm"
        }
    ],
    "plane code": "763",
    "id": "fc13c5cb-93d1-46f9-b496-abbf6faba85a"
},
{
    "stops": "Nonstop",
    "ticket price": "391.33",
    "days to departure": -1,
    "date of extraction": "03/22/2019",
    "departure": ", Halifax",
    "arrival": ", Toronto",
    "flight duration": "0 days 2 hours 30 minutes",
    "airline": "WestJet",
    "plane": "BOEING 737-700 (WINGLETS) PASSENGER",
    "timings": [
        {
            "departure_airport": "Halifax, NS, Canada (YHZ-Stanfield Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "7:10pm",
            "arrival_airport": "Toronto, ON, Canada (YYZ-Pearson Intl.)",
            "arrival_time": "8:40pm"
        }
    ],
    "plane code": "73W",
    "id": "4d49c24b-6fb0-4f45-ba05-a3969ce7308a"
}
]

需要的输出:如下所示的单个 JSON 对象。我想将每个 JSON 对象发布到 Kafka 主题。

{
        "stops": "Nonstop",
        "ticket price": "390.95",
        "days to departure": -1,
        "date of extraction": "03/22/2019",
        "departure": ", Halifax",
        "arrival": ", Toronto",
        "flight duration": "0 days 2 hours 35 minutes",
        "airline": "Air Canada",
        "plane": "Boeing 767-300",
        "timings": [
            {
                "departure_airport": "Halifax, NS, Canada (YHZ-Stanfield Intl.)",
                "departure_date": "03/22/2019",
                "departure_time": "7:40pm",
                "arrival_airport": "Toronto, ON, Canada (YYZ-Pearson Intl.)",
                "arrival_time": "9:15pm"
            }
        ],
        "plane code": "763",
        "id": "fc13c5cb-93d1-46f9-b496-abbf6faba85a"
    }
4

2 回答 2

2

您可以使用SplitJson处理器,该处理器会将消息的 json 数组拆分为单个消息作为每个流文件的内容,即如果您的 json 数组中有 100 条消息,那么拆分 json 处理器拆分关系将输出 100 个流文件,其中包含每条消息

JSON路径是$.*

https://community.hortonworks.com/questions/183055/need-to-display-each-element-of-array-in-a-separat.html

于 2019-03-22T19:07:59.190 回答
0

这是一个旧帖子,但仍然想添加我的建议。首先,@OneCricketeer 是正确的,您必须使用 SplitJson 处理器,但表达式在这方面非常重要。

根据@Meghashaym 提供的json,我建议将数组包装成一个对象,如下所示:

{"payload":[
{
    "stops": "1 Stop",
    "ticket price": "301.20",
    "days to departure": -1,
    "date of extraction": "03/22/2019",
    "departure": ", Halifax",
    "arrival": ", Toronto",
    "flight duration": "0 days 3 hours 58 minutes",
    "airline": "Porter Airlines",
    "plane": "DE HAVILLAND DHC-8 DASH 8-400 DASH 8Q",
    "timings": [
        {
            "departure_airport": "Halifax, NS, Canada (YHZ-Stanfield Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "6:40pm",
            "arrival_airport": "Ottawa, ON, Canada (YOW-Macdonald-Cartier Intl.)",
            "arrival_time": "7:58pm"
        },
        {
            "departure_airport": "Ottawa, ON, Canada (YOW-Macdonald-Cartier Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "8:30pm",
            "arrival_airport": "Toronto, ON, Canada (YTZ-Billy Bishop Toronto City)",
            "arrival_time": "9:38pm"
        }
    ],
    "plane code": "DH4",
    "id": "8e6c69c8-65e0-4f1b-b540-ae61abf8aa6d"
},
{
    "stops": "Nonstop",
    "ticket price": "390.95",
    "days to departure": -1,
    "date of extraction": "03/22/2019",
    "departure": ", Halifax",
    "arrival": ", Toronto",
    "flight duration": "0 days 2 hours 35 minutes",
    "airline": "Air Canada",
    "plane": "Boeing 767-300",
    "timings": [
        {
            "departure_airport": "Halifax, NS, Canada (YHZ-Stanfield Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "7:40pm",
            "arrival_airport": "Toronto, ON, Canada (YYZ-Pearson Intl.)",
            "arrival_time": "9:15pm"
        }
    ],
    "plane code": "763",
    "id": "fc13c5cb-93d1-46f9-b496-abbf6faba85a"
},
{
    "stops": "Nonstop",
    "ticket price": "391.33",
    "days to departure": -1,
    "date of extraction": "03/22/2019",
    "departure": ", Halifax",
    "arrival": ", Toronto",
    "flight duration": "0 days 2 hours 30 minutes",
    "airline": "WestJet",
    "plane": "BOEING 737-700 (WINGLETS) PASSENGER",
    "timings": [
        {
            "departure_airport": "Halifax, NS, Canada (YHZ-Stanfield Intl.)",
            "departure_date": "03/22/2019",
            "departure_time": "7:10pm",
            "arrival_airport": "Toronto, ON, Canada (YYZ-Pearson Intl.)",
            "arrival_time": "8:40pm"
        }
    ],
    "plane code": "73W",
    "id": "4d49c24b-6fb0-4f45-ba05-a3969ce7308a"
}
]}

现在我正在使用Jsonpath finder来查看 json 结构。当我们点击 Payload 对象时,我们可以看到路径 x.payload 中的数组项

在这种情况下,您可以使用 $.payload[*] 作为处理器中的表达式,并在调度选项卡下设置主节点执行选项。 这应该将队列列表中的各个项目排队。所以基本上我们正在解析数组对象的每个元素。在此处输入图像描述

于 2021-01-22T07:56:34.940 回答