我正在使用 Jython InvokeScriptedProcessor 将数据从 json 结构构造到 sql 结构。我在使用特定功能时遇到问题。json.loads。json.loads 无法识别特殊字符,如 ñ、é、á、í...
它以一种奇怪的形式写出来。而且我还没有达到任何形式来拥有它。
例如(很简单)
{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}
如果我们尝试用 sql 编写它
INSERT INTO .... (id, value) VALUES ("...",3.141592);
它会失败。它让我失望。我无法使用任何返回选项返回数据,成功或失败,NiFi 的版本无关紧要。这是我的代码
def process(self, inputStream, outputStream):
# read input json data from flowfile content
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
data = json.loads(text)
两者都不
data = json.loads(text.encode("utf-8"))
工作正常。文本以 unicode 形式出现。
def __generate_sql_transaction(input_data):
""" Generate SQL statement """
sql = """
BEGIN;"""
_id = input_data.get("id")
_timestamp = input_data.get("timestamp")
_flowfile_metrics = input_data.get("metrics")
_flowfile_metadata = input_data.get("metadata")
self.valid = __validate_metrics_type(_flowfile_metrics)
if self.valid is True:
self.log.error("generate insert")
sql += """
INSERT INTO
{0}.{1} (id, timestamp, metrics""".format(schema, table)
if _flowfile_metadata:
sql += ", metadata"
sql += """)
VALUES
('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))
self.log.error("generate metadata")
if _flowfile_metadata:
sql += ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
sql += """)
ON CONFLICT ({})""".format(on_conflict)
if not bool(int(self.update)):
sql += """
DO NOTHING;"""
else:
sql += """
DO UPDATE
SET"""
if bool(int(self.preference)):
sql += """
metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
else:
sql += """
metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))
else:
return ""
sql += """
COMMIT;"""
return sql
我再次将数据发送到 NiFi:
output = __generate_sql_transaction(data)
self.log.error("post generate_sql_transaction")
self.log.error(output.encode("utf-8"))
# If no sql_transaction is generated because requisites weren't met,
# set the processor output with the original flowfile input.
if output == "":
output = text
# write new content to flowfile
outputStream.write(
output.encode("utf-8")
)
那个输出看起来像
INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');
我在元数据中也有“Ñueco”,它不适用于 id 或元数据
注意:似乎 InvokeScriptedProcessor 使用 Groove 而不是 Python 可以正常工作。但我的问题是我对 Groovy 一无所知...
有没有人发现类似的问题?你是怎么解决的?
更新:
输入示例:
{"id":"ÑUECO",
"metrics":{
"value":3.1415
},
"metadata":{
"location":"ÑUECO"
},
"timestamp":"2020-01-01 00:00:00+01:00"
}
期望的输出:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO',
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
实际输出:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO',
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;