I'm using a Jython InvokeScriptedProcessor to struct data from json struct to sql struct. I'm having trouble with a specific function. json.loads. json.loads does not recognize special characters like ñ, é, á, í...
It writes it in an odd form. And I've not reached any form to have it.
e.g. (very simple)
{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}
If we try to write it in sql like
INSERT INTO .... (id, value) VALUES ("...",3.141592);
It will fail. It fails me. I cannot return data with any return option, success or failure, it doesn't matter NiFi's version. Here is my code
def process(self, inputStream, outputStream):
# read input json data from flowfile content
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
data = json.loads(text)
Neither
data = json.loads(text.encode("utf-8"))
works properly. text comes in unicode.
def __generate_sql_transaction(input_data):
""" Generate SQL statement """
sql = """
BEGIN;"""
_id = input_data.get("id")
_timestamp = input_data.get("timestamp")
_flowfile_metrics = input_data.get("metrics")
_flowfile_metadata = input_data.get("metadata")
self.valid = __validate_metrics_type(_flowfile_metrics)
if self.valid is True:
self.log.error("generate insert")
sql = """
INSERT INTO
{0}.{1} (id, timestamp, metrics""".format(schema, table)
if _flowfile_metadata:
sql = ", metadata"
sql = """)
VALUES
('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))
self.log.error("generate metadata")
if _flowfile_metadata:
sql = ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
sql = """)
ON CONFLICT ({})""".format(on_conflict)
if not bool(int(self.update)):
sql = """
DO NOTHING;"""
else:
sql = """
DO UPDATE
SET"""
if bool(int(self.preference)):
sql = """
metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
else:
sql = """
metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))
else:
return ""
sql = """
COMMIT;"""
return sql
I send the data to NiFi again with:
output = __generate_sql_transaction(data)
self.log.error("post generate_sql_transaction")
self.log.error(output.encode("utf-8"))
# If no sql_transaction is generated because requisites weren't met,
# set the processor output with the original flowfile input.
if output == "":
output = text
# write new content to flowfile
outputStream.write(
output.encode("utf-8")
)
That output seems like
INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');
I have "Ñueco" also in metadata, and it doesn't works fine with id nor metadata
NOTE: It seems that InvokeScriptedProcessor works fine using Groove instead of Python. But my problem is I know nothing about Groovy...
Does anybody found a similar issue? How did you solve it?
Update:
Input Example:
{"id":"ÑUECO",
"metrics":{
"value":3.1415
},
"metadata":{
"location":"ÑUECO"
},
"timestamp":"2020-01-01 00:00:00 01:00"
}
Desired Output:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO',
'2020-01-01T00:00:00 01:00',
'{"value":3.1415}',
'{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
Real Output:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO',
'2020-01-01T00:00:00 01:00',
'{"value":3.1415}',
'{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
CodePudding user response:
UPD
jython does not work correctly with byte-strings - so, don't use
.encode('utf-8')
use java methods to write content back to flow file with specific encoding
below is a sample that reads and writes correctly non-ascii chars including Ñ
use ExecuteScript
processor with jython and replace body of _transform(text)
function:
import traceback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class FlowWriter(StreamCallback):
def _transform(self, text):
# transform incoming text here
return '@@@@' text '****'
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
new_text = self._transform(text)
IOUtils.write(new_text, outputStream, StandardCharsets.UTF_8)
flowFile = session.get()
if flowFile != None:
try:
flowFile = session.write(flowFile, FlowWriter())
flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
except Exception as e:
log.error("{}\n{}".format(e,traceback.format_exc()))
session.rollback(True) # put file back and penalize it
CodePudding user response:
I've recently found this answer.
https://stackoverflow.com/a/35882335/7634711
It's not a problem with NiFi. It's a problem with Python2
and how it works with json
library. And the problems will be also in Python3
if special characters come in dict keys.