I have a large JSON file like the following:
[(3, (2, 'Child')), (2, (1, 'Parent')), (1, (None, 'Root'))]
where the key of each element is a unique index for that element and 1st element in the value pair signifies the index of its parent element.
Now, the ultimate goal is to convert this JSON file into the following:
[(3, (2, 'Child Parent Root')), (2, (1, 'Parent Root')), (1, (None, 'Root'))]
where the 2nd element in the value pair for each item will be modified such that it has the concatenation of all the values up to its root ancestor.
The no. of levels is not fixed and can be up to 256. I know I can solve this problem by creating a tree DS and traversing it but the problem is the JSON file is huge (almost 180M items in the list).
Any idea on how can I achieve this efficiently? Suggestions involving Apache Spark would be fine as well.
CodePudding user response:
You can use a breadth-first search to find all ancestor element chains:
from collections import deque, defaultdict
d, d1 = [(3, (2, 'Child')), (2, (1, 'Parent')), (1, (None, 'Root'))], defaultdict(list)
for a, (b, c) in d:
d1[b].append((a, c))
q, r = deque([(1, d1[None][0][1])]), {}
while q:
r[n[0]] = (n:=q.popleft())[1]
q.extend([(a, b ' ' n[1]) for a, b in d1[n[0]]])
Now, r
stores the ancestor values for each element:
{1: 'Root', 2: 'Parent Root', 3: 'Child Parent Root'}
Then, using a list comprehension to update d
:
result = [(a, (b, r[a])) for a, (b, _) in d]
Output:
[(3, (2, 'Child Parent Root')), (2, (1, 'Parent Root')), (1, (None, 'Root'))]
An iterative approach such as BFS will eliminate the possibility of a RecursionError
which might occur when running DFS on a very large graph.
CodePudding user response:
In Spark this is treated a Graph
problem and solved using Vertex Centric Programming
. Unfortunately, GraphX
does not have a python compatible API.
Another option is to use graphframes
.
I have included here a logic using joins which mimics Vertex Centric Programming
but without using any libraries. Provided you can convert the data representation you have into a dataframe with id
, parent_id
and name
column.
from pyspark.sql import functions as F
from pyspark.sql.functions import col as c
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("nsankaranara").getOrCreate()
data = [(6, 3, "Child_3", ),
(5, 2, "Child_2", ),
(4, 2, "Child_1", ),
(3, 1, "Parent_2", ),
(2, 1, "Parent_1", ),
(1, None, "Root"), ]
df = spark.createDataFrame(data, ("id", "parent_id", "name", ))
df = (df.withColumn("mapped_parent_id", c("parent_id"))
.withColumn("visited", F.lit(False)))
start_nodes = df.filter(c("mapped_parent_id").isNull())
# Controls how deep we want to traverse
max_iter = 256
iter_counter = 0
# Iteratively identify the next child and add your name to them
while iter_counter < max_iter:
iter_counter = 1
df = (df.alias("a").join(start_nodes.alias("b"),
((c("a.parent_id") == c("b.id")) | (c("a.id") == c("b.id"))), how="left_outer"))
df = (df.select(c("a.id"),
c("a.parent_id"),
(F.when((c("b.id").isNotNull() & (c("a.id") != c("b.id"))), F.lit(None)).otherwise(c("a.mapped_parent_id"))).alias("mapped_parent_id"),
F.when((c("a.id") != c("b.id")), F.concat_ws(" ", c("a.name"), c("b.name"))).otherwise(c("a.name")).alias("name"),
(F.when(c("a.id") == c("b.id"), F.lit(True)).otherwise(c("a.visited"))).alias("visited")
))
start_nodes = df.filter(((c("mapped_parent_id").isNull()) & (c("visited") == False)))
if start_nodes.count() == 0:
# signifies that all nodes have been visited
break
df.select("id", "parent_id", "name").show(truncate = False)
Output
--- --------- ---------------------
|id |parent_id|name |
--- --------- ---------------------
|6 |3 |Child_3 Parent_2 Root|
|5 |2 |Child_2 Parent_1 Root|
|4 |2 |Child_1 Parent_1 Root|
|3 |1 |Parent_2 Root |
|2 |1 |Parent_1 Root |
|1 |null |Root |
--- --------- ---------------------