0

目前我遇到了以下问题:我正在使用 KafkaConsumer 读取来自 Kafka 主题的消息。消息是字符串并具有以下格式: { "a" : "b", "a1" : "b1", "c2" : "c3" } 它们保存在 FlowFile 的有效负载中。

我想将该字符串转换为 json 或理想情况下转换为 csv,但不知道该怎么做。

我是 NiFi 的新手并尽可能多地进行了研究,但我发现的答案是关于从 json 到 avro 或类似的转换,但从不将字符串转换为 json 或 avro。我还发现 Kafka 消息在 FlowFile 的有效负载中,而不是在属性中,所以我不知道如何获得它,因为示例总是涉及属性。

简而言之:我可以使用一些内置处理器将 FlowFile 的有效负载(一个字符串)转换为 json/cvs。

4

2 回答 2

0

如果您的消息在 FlowFile 中,则以下顺序可能会有所帮助:

1) 使用 AttributesToJson 将有效负载消息转换为 Json。2) 使用 EvaluateJsonPath 提取有效负载消息。在你的情况下,卡夫卡消息。然后您可以将提取的消息传递给 csv 生成。

这篇文章可以帮助将 Json 转换为 CSV:将 Json 转换为 CSV

于 2017-08-16T20:26:21.697 回答
0

我最终这样做了:

  1. ConsumeKafka 给了我字符串:

{ "a" : "b", "a1" : "b1" }

  1. EvaluateJsonPath 通过添加属性来创建属性

a -> $.a //results in attribute named a with value b

a1 -> $.a1 //results in attribute named a1 with value b1

  1. ReplaceText 从 EvaluateJsonPath 获取属性以形成一个单独的 csv 格式:

Replacement value -> ${'a'},${'a1'}

这导致单行,但没有新行:

b,b1

要添加附加\n'\n'的新行,"\n" 不起作用起作用的是在“替换值”字段中键入时按Shift+Enter,这会导致创建一个空的新行。

于 2017-08-23T06:37:30.907 回答