Home > Back-end >  Python 2.X on NiFi: problems with Ñ (and others) in json.loads
Python 2.X on NiFi: problems with Ñ (and others) in json.loads

Time:10-08

I'm using a Jython InvokeScriptedProcessor to struct data from json struct to sql struct. I'm having trouble with a specific function. json.loads. json.loads does not recognize special characters like ñ, é, á, í...

It writes it in an odd form. And I've not reached any form to have it.

e.g. (very simple)

{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}

If we try to write it in sql like

INSERT INTO .... (id, value) VALUES ("...",3.141592);

It will fail. It fails me. I cannot return data with any return option, success or failure, it doesn't matter NiFi's version. Here is my code

    def process(self, inputStream, outputStream):
        # read input json data from flowfile content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text) 

Neither

        data = json.loads(text.encode("utf-8"))

works properly. text comes in unicode.

                def __generate_sql_transaction(input_data):
                    """ Generate SQL statement """

                    sql = """
        BEGIN;"""

                    _id = input_data.get("id")
                    _timestamp = input_data.get("timestamp")
                    _flowfile_metrics = input_data.get("metrics")
                    _flowfile_metadata = input_data.get("metadata")

                    self.valid = __validate_metrics_type(_flowfile_metrics)

                    if self.valid is True:
                        self.log.error("generate insert")
                        sql  = """
            INSERT INTO
                {0}.{1} (id, timestamp, metrics""".format(schema, table)

                        if _flowfile_metadata:
                            sql  = ", metadata"
                        sql  = """)
            VALUES
                ('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))

                        self.log.error("generate metadata")
                        if _flowfile_metadata:
                            sql  = ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
                        sql  = """)
            ON CONFLICT ({})""".format(on_conflict)

                        if not bool(int(self.update)):
                            sql  = """
                DO NOTHING;"""
                        else:
                            sql  = """
                DO UPDATE
                    SET"""
                            if bool(int(self.preference)):
                                sql  = """
                        metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
                            else:
                                sql  = """
                        metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))

                    else:
                        return ""

                    sql  = """
        COMMIT;"""
                    return sql

I send the data to NiFi again with:

        output = __generate_sql_transaction(data)
        self.log.error("post generate_sql_transaction")
        self.log.error(output.encode("utf-8"))

        # If no sql_transaction is generated because requisites weren't met,
        # set the processor output with the original flowfile input.
        if output == "":
            output = text

        # write new content to flowfile
        outputStream.write(
            output.encode("utf-8")
        )

That output seems like

INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');

I have "Ñueco" also in metadata, and it doesn't works fine with id nor metadata

NOTE: It seems that InvokeScriptedProcessor works fine using Groove instead of Python. But my problem is I know nothing about Groovy...

Does anybody found a similar issue? How did you solve it?

Update:

Input Example:

{"id":"ÑUECO",
 "metrics":{
     "value":3.1415
 },
 "metadata":{
     "location":"ÑUECO"
 },
 "timestamp":"2020-01-01 00:00:00 01:00"
}

Desired Output:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO', 
        '2020-01-01T00:00:00 01:00',
        '{"value":3.1415}',
        '{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
    SET
        metrics='{"value":3.1415}' || Table.metrics;
COMMIT;

Real Output:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO', 
        '2020-01-01T00:00:00 01:00',
        '{"value":3.1415}',
        '{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
    SET
        metrics='{"value":3.1415}' || Table.metrics;
COMMIT;

CodePudding user response:

UPD

  1. jython does not work correctly with byte-strings - so, don't use .encode('utf-8')

  2. use java methods to write content back to flow file with specific encoding

below is a sample that reads and writes correctly non-ascii chars including Ñ

use ExecuteScript processor with jython and replace body of _transform(text) function:

import traceback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class FlowWriter(StreamCallback):
    def _transform(self, text):
        # transform incoming text here
        return '@@@@'   text   '****'

    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        new_text = self._transform(text)
        IOUtils.write(new_text, outputStream, StandardCharsets.UTF_8)

flowFile = session.get()
if flowFile != None:
    try:
        flowFile = session.write(flowFile, FlowWriter())
        flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        log.error("{}\n{}".format(e,traceback.format_exc()))
        session.rollback(True)  # put file back and penalize it

CodePudding user response:

I've recently found this answer.

https://stackoverflow.com/a/35882335/7634711

It's not a problem with NiFi. It's a problem with Python2 and how it works with json library. And the problems will be also in Python3 if special characters come in dict keys.

  • Related