I am trying to process a nested JSON and flatten it in Apache NiFi, with the help of the JoltTransformation processor by supplying a spec.
Sample JSON:
Input
{
"status": 0,
"body": {
"updatetime": 1667946041,
"timezone": "Europe/Luxembourg",
"measuregrps": [
{
"grpid": 3807008748,
"attrib": 0,
"date": 1660928128,
"created": 1660928165,
"modified": 1660928167,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"measures": [
{
"value": 62,
"type": 11,
"unit": 0,
"algo": 16909313,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 63,
"type": 135,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 143,
"type": 136,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 396,
"type": 137,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 402,
"type": 138,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
}
],
"comment": null
},
{
"grpid": 3801487301,
"attrib": 0,
"date": 1660725664,
"created": 1660725667,
"modified": 1660725667,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"measures": [
{
"value": 968,
"type": 54,
"unit": -1,
"algo": 33619969,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
}
],
"comment": null,
"is_inconclusive": false
},
{
"grpid": 3801485699,
"attrib": 0,
"date": 1660725563,
"created": 1660725596,
"modified": 1660725597,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"measures": [
{
"value": 72,
"type": 11,
"unit": 0,
"algo": 16909313,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 60,
"type": 135,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 150,
"type": 136,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 366,
"type": 137,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 400,
"type": 138,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
}
],
"comment": null
},
{
"grpid": 3799078577,
"attrib": 2,
"date": 1660636784,
"created": 1660636789,
"modified": 1660636789,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"measures": [
{
"value": 61000,
"type": 1,
"unit": -3,
"algo": 0,
"fm": 255,
"apppfmid": 7,
"appliver": 5110101
}
],
"comment": null
},
{
"grpid": 3781281953,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"measures": [
{
"value": 6200,
"type": 1,
"unit": -2,
"algo": 0,
"fm": 255
}
],
"comment": null
},
{
"grpid": 3781281952,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"measures": [
{
"value": 1670,
"type": 4,
"unit": -3,
"algo": 0,
"fm": 255
}
],
"comment": null
}
]
}
}
Jolt Spec
[
{
"operation": "shift",
"spec": {
"body": {
"measuregrps": {
"*": {
"measures": {
"*": {
"*": "[&1].&"
}
},
"@(3,status)": "[&1].status",
"@(2,updatetime)": "[&1].updatetime",
"@(2,timezone)": "[&1].timezone",
"*": "[&1].&"
}
}
}
}
}
]
Output
[
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3807008748,
"attrib": 0,
"date": 1660928128,
"created": 1660928165,
"modified": 1660928167,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"value": [
62,
968,
72,
61000,
6200,
1670
],
"type": [
11,
54,
11,
1,
1,
4
],
"unit": [
0,
-1,
0,
-3,
-2,
-3
],
"algo": [
16909313,
33619969,
16909313,
0,
0,
0
],
"fm": [
255,
255,
255,
255,
255,
255
],
"apppfmid": [
9,
9,
9,
7
],
"appliver": [
2421,
2421,
2421,
5110101
],
"comment": null
},
{
"value": [
63,
60
],
"type": [
135,
135
],
"unit": [
0,
0
],
"algo": [
0,
0
],
"fm": [
255,
255
],
"apppfmid": [
9,
9
],
"appliver": [
2421,
2421
],
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3801487301,
"attrib": 0,
"date": 1660725664,
"created": 1660725667,
"modified": 1660725667,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment": null,
"is_inconclusive": false
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3801485699,
"attrib": 0,
"date": 1660725563,
"created": 1660725596,
"modified": 1660725597,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment": null
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3799078577,
"attrib": 2,
"date": 1660636784,
"created": 1660636789,
"modified": 1660636789,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"comment": null
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3781281953,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"comment": null
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3781281952,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"comment": null
}
]
Expected Output
[ {
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3807008748,
"attrib" : 0,
"date" : 1660928128,
"created" : 1660928165,
"modified" : 1660928167,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"value" : 62,
"type" : 11,
"unit" : 0,
"algo" : 16909313,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"comment" : null
}, {
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3807008748,
"attrib" : 0,
"date" : 1660928128,
"created" : 1660928165,
"modified" : 1660928167,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"value" : 968,
"type" : 54,
"unit" : -1,
"algo" : 33619969,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"comment" : null
},
{
"value" : 63,
"type" : 135,
"unit" : 0,
"algo" : 0,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3801487301,
"attrib" : 0,
"date" : 1660725664,
"created" : 1660725667,
"modified" : 1660725667,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment" : null,
"is_inconclusive" : false
},
{
"value" : 60,
"type" : 135,
"unit" : 0,
"algo" : 0,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3801487301,
"attrib" : 0,
"date" : 1660725664,
"created" : 1660725667,
"modified" : 1660725667,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment" : null,
"is_inconclusive" : false
}
]
So my expectation is to have flatten the JSON and have single values for each object, after which I plan to send this over to the ConvertJsontoSql processor within the NiFi to have the records inserted into PostgresDB.
https://jolt-demo.appspot.com/
Also, what does is_inconclusive
mean I can see it gets added to my output when using Jolt Transform
CodePudding user response:
You can group the values by grpid
along with indexes of measures
array(while looping within this array's elements) such as
[
{
"operation": "shift",
"spec": {
"body": {
"measuregrps": {
"*": {
"measures": {
"*": {
"*": {
"@(6,status)": "@(4,grpid)[&2].status",
"@(5,updatetime)": "@(4,grpid)[&2].updatetime",
"@(5,timezone)": "@(4,grpid)[&2].timezone",
"@(3,grpid)": "@(4,grpid)[&2].grpid",
"@(3,date)": "@(4,grpid)[&2].date",
"@(3,created)": "@(4,grpid)[&2].created",
"@(3,modified)": "@(4,grpid)[&2].modified",
"@(3,category)": "@(4,grpid)[&2].category",
"@(3,deviceid)": "@(4,grpid)[&2].deviceid",
"@(3,hash_deviceid)": "@(4,grpid)[&2].hash_deviceid",
"@": "@(4,grpid)[&2].&"
}
}
}
}
}
}
}
},
{
// get rid of object labels
"operation": "shift",
"spec": {
"*": {
"*": ""
}
}
},
{
// pick only one components for the array values
"operation": "cardinality",
"spec": {
"*": {
"*": "ONE"
}
}
}
]
which will yield a flattened result as needed.
P.S : I don't know and didn't encounter is_inconclusive
within the Jolt context.