Home > Software design >  Spark DataFrame Lazy Evaluation when select function is called
Spark DataFrame Lazy Evaluation when select function is called

Time:12-30

Knowing that Spark only do the real job when an action is called (e.g. a show on a DataFrame) I have one doubt regarding the extension of this lazyness behavior.

Imagine the following scenario of a development of a DataFrame with 3 columns:

val df = otherDF
.withColumn("aaa", lit("AAA"))
.withColumn("bbb", lit("BBB")
.withColumn("ccc", lit("CCC")

After this, I will select only one column and show (trigger an action):

df
.select("aaa")
.show

Will spark only compute the "aaa" column and ignore the other ones if they are not needed? Or will it evaluate and process the "bbb" and "ccc" columns also and the select function will only filter the output subset?

The real scenario here is that I want to create a "master" DataFrame with many columns and complex transformations, but then some sub-processes will select the master DataFrame with only a subset of the columns and add if needed some more specific columns. I want to guarantee that if a sub-process that only needs 10% of the columns will not be affected by the all evaluation and process of the complete master DataFrame (if this is possible).

Thanks in advance

CodePudding user response:

I prepared this sample code:

val input = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/*@gmail.com/city_temperature.csv")
val df = input
.withColumn("aaa", lit("AAA"))
.withColumn("bbb", lit("BBB"))
.withColumn("ccc", lit("CCC"))
.withColumn("generated_value",monotonically_increasing_id)
            
import org.apache.spark.sql.execution.debug._
df.select("aaa", "generated_value").debugCodegen()

I am reading csv, then adding some column and at the end selecting only few of them. I added monotonically_increasing_id to include also column which is not a literal value but is generated dynamically

.debigCodegen() shows us what code was generated, so lets take a look at first version where i am selecting also the generated_value

/* 029 */   private void project_doConsume_0(InternalRow inputadapter_row_0) throws java.io.IOException {
/* 030 */     final long project_value_1 = partitionMask   project_count_0;
/* 031 */     project_count_0  ;
/* 032 */
/* 033 */     project_mutableStateArray_0[0].reset();
/* 034 */
/* 035 */     project_mutableStateArray_0[0].write(0, ((UTF8String) references[0] /* literal */));
/* 036 */
/* 037 */     project_mutableStateArray_0[0].write(1, project_value_1);
/* 038 */     append((project_mutableStateArray_0[0].getRow()));
/* 039 */
/* 040 */   }

Here you can see that code need to calculate id was generated and later executed, its this part:

final long project_value_1 = partitionMask project_count_0;

Now the same code but lets remove second column from select. First lines of code are the same as in previous example

import org.apache.spark.sql.execution.debug._
df.select("aaa").debugCodegen()

Project_doConsume is different

/* 024 */   private void project_doConsume_0(InternalRow inputadapter_row_0) throws java.io.IOException {
/* 025 */     project_mutableStateArray_0[0].reset();
/* 026 */
/* 027 */     project_mutableStateArray_0[0].write(0, ((UTF8String) references[0] /* literal */));
/* 028 */     append((project_mutableStateArray_0[0].getRow()));
/* 029 */
/* 030 */   }

Code needed for monotnically_increasing was not generated which means that Spark is able to pushdown projection and generate only columns that are needed

  • Related