Trying to get a deeper understanding of how spark works and was playing around with the pyspark cli (2.4.0). I was looking for the difference between using limit(n).show()
and show(n)
. I ended up getting two very different performance times for two very similar queries. Below are the commands I ran. The parquet file referenced in the code below has about 50 columns and is over 50gb in size on remote HDFS.
# Create dataframe
>>> df = sqlContext.read.parquet('hdfs://hdfs.host/path/to.parquet') ↵
# Create test1 dataframe
>>> test1 = df.select('test_col') ↵
>>> test1.schema ↵
StructType(List(StructField(test_col,ArrayType(LongType,true),true)))
>>> test1.explain() ↵
== Physical Plan ==
*(1) Project [test_col#40]
- *(1) FileScan parquet [test_col#40]
Batched: false,
Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdfs.host/path/to.parquet],
PartitionCount: 25,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<test_col:array<bigint>>
# Create test2 dataframe
>>> test2 = df.select('test_col').limit(5) ↵
>>> test2.schema ↵
StructType(List(StructField(test_col,ArrayType(LongType,true),true)))
>>> test2.explain() ↵
== Physical Plan ==
CollectLimit 5
- *(1) Project [test_col#40]
- *(1) FileScan parquet [test_col#40]
Batched: false,
Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdfs.host/path/to.parquet],
PartitionCount: 25,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<test_col:array<bigint>>
Notice that the physical plan is almost identical for both test1
and test2
. The only exception is test2's plan starts with "CollectLimit 5". After setting this up I ran test1.show(5)
and test2.show(5)
. Test 1 returned the results instantaneously. Test 2 showed a progress bar with 2010 tasks and took about 20 minutes to complete (I only had one executor)
Question Why did test 2 (with limit) perform so poorly compared to test 1 (without limit)? The data set and result set were identical and the physical plan was nearly identical.
CodePudding user response:
Keep in mind:
show()
is an alias forshow(20)
and relies internally ontake(n: Int): Array[T]
limit(n: Int)
returns another dataset and is an expensive operation that reads the whole source
CodePudding user response:
Limit - result in new dataframe and taking longer time because this is because predicate pushdown is currently not supported in your input file format. Hence reading entire dataset and applying limit.