I have a list of directed edges which represent a tree. 'u v' means u is a child of v.
sc = SparkContext(conf = conf)
lines = sc.textFile("/content/sample_data/data.txt")
lines.take(10)
['0 0', '1 0', '2 0', '3 1', '4 1', '5 2', '6 2', '7 3', '8 3', '9 4']
I converted the above to the following form stored as intermediate
:
[(0, ('out', 0)),
(0, ('in', 0)),
(1, ('out', 0)),
(0, ('in', 1)),...]
I am trying to build an adjacency list of the form from the above:
[(8721, [('out', 4360), ('in', 17443), ('in', 17444)]),
(9291, [('out', 4645), ('in', 18583), ('in', 18584)]),
(9345, [('out', 4672), ('in', 18691), ('in', 18692)]),..]
Here, First row tells that 8721 is a child of 4360 and [17443, 17444] are children of 8721
I am using groupByKey
or reduceByKey
methods exposed by the Spark module.
intermediate.groupByKey().mapValues(list)
Above line is taking a lot of time. It is taking almost 250 seconds for 100 MB of test data on 8-core machine with 12 GB RAM. I have to eventually deploy it for >15GB data on distributed environment.
I understand that groupByKey causes shuffling of data across all nodes. Is there any way to avoid it in my case ? Any suggestions on how to optimise this operation is appreciated.
CodePudding user response:
You can't avoid shuffle as you're grouping rows of your dataset. However, you can use dataframe API instead of RDDs. Dataframe API is more performant than RDDs, see this answer
If your txt file is in a form as follows:
0 0
0 1
...
Then you can read it as a dataframe.
df = spark.read.csv('test.txt', sep=' ')
df.show()
--- ---
|_c0|_c1|
--- ---
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 1|
| 4| 1|
| 5| 2|
| 6| 2|
| 7| 3|
| 8| 3|
| 9| 4|
--- ---
Do cross join or unionAll by attaching the type column:
df2 = df.withColumn('_c2', f.lit('in')).unionAll(df.select('_c1', '_c0').withColumn('_c2', f.lit('out')))
df2.show()
--- --- ---
|_c0|_c1|_c2|
--- --- ---
| 0| 0| in|
| 1| 0| in|
| 2| 0| in|
| 3| 1| in|
| 4| 1| in|
| 5| 2| in|
| 6| 2| in|
| 7| 3| in|
| 8| 3| in|
| 9| 4| in|
| 0| 0|out|
| 0| 1|out|
| 0| 2|out|
| 1| 3|out|
| 1| 4|out|
| 2| 5|out|
| 2| 6|out|
| 3| 7|out|
| 3| 8|out|
| 4| 9|out|
--- --- ---
and groupBy
the result.
df3 = df2.groupBy('_c0').agg(f.collect_list(f.array('_c2','_c1'))).toDF('edge', 'list')
df3.show(truncate=False)
df3.printSchema()
---- ---------------------------------------
|edge|list |
---- ---------------------------------------
|7 |[[in, 3]] |
|3 |[[in, 1], [out, 7], [out, 8]] |
|8 |[[in, 3]] |
|0 |[[in, 0], [out, 0], [out, 1], [out, 2]]|
|5 |[[in, 2]] |
|6 |[[in, 2]] |
|9 |[[in, 4]] |
|1 |[[in, 0], [out, 3], [out, 4]] |
|4 |[[in, 1], [out, 9]] |
|2 |[[in, 0], [out, 5], [out, 6]] |
---- ---------------------------------------
root
|-- edge: string (nullable = true)
|-- list: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: string (containsNull = true)