I have a need to get data out of a NiFi flow file with somewhat complex JSON content. I'm planning to use a NiFi ExecuteScript
processor because I don't think it can be done with EvaluateJSONPath
. The content looks like this (snippet)
...
"segments": [
{
"INS01": "Y",
"INS03": "001",
"INS02": "18",
"INS05": "A",
"id": "INS",
"INS04": "AI",
"INS08": "FT"
},
{
"REF02": "1041558xxxxx",
"REF01": "0F",
"id": "REF"
},
{
"REF02": "ABD",
"REF01": "1L",
"id": "REF"
},
{
"REF02": "106835xxxxx",
"REF01": "23",
"id": "REF"
}
],
...
I want to extract the REF02
property value from the segments array element that has REF01 === '0F'
. The array element does not necessarily have a REF02
property. So in the above case I should get 1041558xxxxx
.
Here's my current script:
var flowFile = session.get()
if (flowFile != null) {
var InputStreamCallback = Java.type('org.apache.nifi.processor.io.InputStreamCallback')
var IOUtils = Java.type('org.apache.commons.io.IOUtils')
var StandardCharsets = Java.type('java.nio.charset.StandardCharsets')
try {
var subscriber = null
session.read(flowFile,
new InputStreamCallback(function (inputStream) {
var data = JSON.parse(IOUtils.toString(inputStream, StandardCharsets.UTF_8))
var segment = data.segments.find(function (s) { return s.hasOwnProperty('REF01') && s.REF01 === '0F' })
subscriber = segment ? segment.REF02 : null
}));
session.putAttribute(flowFile, 'subscriber', subscriber ? subscriber : '')
session.transfer(flowFile, REL_SUCCESS)
} catch (e) {
log.error('script failed', e)
session.transfer(flowFile, REL_FAILURE)
}
}
When I execute the above, I get a java.lang.NoSuchMethodException
. Also, are anonymous 'arrow' functions allow?
I've tried using an old-school for
loop to no avail.
Thanks for your help.
CodePudding user response:
You can add a JoltTransformJSON processor with specification
[
{
"operation": "shift",
"spec": {
"segments": {
"*": {
"REF01": {
"0F": {// conditional to match "REF01" with "0F"
"@2,REF02": ""// go two levels up the three to reach the level of the attributes REF01 or REF02
}
}
}
}
}
}
]
in order to return the result
"1041558xxxxx"
CodePudding user response:
Groovy script:
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.JsonSlurper
flowFile = session.get()
if(!flowFile) return
def jsonSlurper = new JsonSlurper()
def subscriber = ""
flowFile = session.write(flowFile, {inputStream, outputStream ->
input = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
json = jsonSlurper.parseText(input)
segment = json.segments.find{ segment ->
if (segment.keySet().contains('REF01')) {
if (segment.REF01 == '0F') {
return true
} else {
return false
}
} else {
return false
}
}
if (segment) {
subscriber = segment.REF02
}
outputStream.write(input.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.putAttribute(flowFile, 'subscriber', subscriber)
session.transfer(flowFile, REL_SUCCESS)
input:
{
"test": "best",
"segments": [
{
"INS01": "Y",
"INS03": "001",
"INS02": "18",
"INS05": "A",
"id": "INS",
"INS04": "AI",
"INS08": "FT"
},
{
"REF02": "1041558xxxxx",
"REF01": "0F",
"id": "REF"
},
{
"REF02": "ABD",
"REF01": "1L",
"id": "REF"
},
{
"REF02": "106835xxxxx",
"REF01": "23",
"id": "REF"
}
]
}
output (with attribute subscriber
: 1041558xxxxx
):
{
"test": "best",
"segments": [
{
"INS01": "Y",
"INS03": "001",
"INS02": "18",
"INS05": "A",
"id": "INS",
"INS04": "AI",
"INS08": "FT"
},
{
"REF02": "1041558xxxxx",
"REF01": "0F",
"id": "REF"
},
{
"REF02": "ABD",
"REF01": "1L",
"id": "REF"
},
{
"REF02": "106835xxxxx",
"REF01": "23",
"id": "REF"
}
]
}