Home > Blockchain >  spark spark.read().load().select().filter() vs spark.read().option(query) BIG time diference
spark spark.read().load().select().filter() vs spark.read().option(query) BIG time diference

Time:01-11

Hello I am working on a project where I have to pull data between 2018 and 2023. It's about 200 million records (not that many), But now I am confused with these two approaches to load data.

I did the query in oracle DB and it takes between 80 to 150 seconds.

Then I tried it like this with spark:

    DXHS_FACTURACION_CONSUMOS = spark.read \
        .format("jdbc") \
        .option("url", url_SGC_ORACLE) \
        .option("dbtable", 'DXHS_FACTURACION_CONSUMOS') \
        .option("user", username) \
        .option("password", password) \
        .option("driver", driver_oracle) \
        .load().select('ID_SERVICIO','ID_MES','CS_FACTURADO','CS_ANULADO','CS_REFACTURADO','IN_FACTURADO','CC_DIAS_FACTURADO','FE_FACTURA_ULTIMA') \
DXHS_FACTURACION_CONSUMOS.show()

It has been running for about 20 minutes and it does not finish, then I tried:

query2 = """
  SELECT ID_SERVICIO, ID_MES, CS_FACTURADO, CS_ANULADO, CS_REFACTURADO, IN_FACTURADO, CC_DIAS_FACTURADO, FE_FACTURA_ULTIMA FROM DXHS_FACTURACION_CONSUMOS WHERE ID_MES >=201801
"""

DXHS_FACTURACION_CONSUMOS = spark.read \
    .format("jdbc") \
    .option("url", url_SGC_ORACLE) \
    .option("query", query2) \
    .option("user", username) \
    .option("password", password) \
    .option("driver", driver_oracle) \
    .load().alias('DXHS_FACTURACION_CONSUMOS')



DXHS_FACTURACION_CONSUMOS.show()

It takes about 90 seconds to finish.

Does the first example load the whole table and then it start filtering? while the second example filter first using the database and loads only the required data to spark? or why is it so big of a difference?

Thank you for the knowledge.

CodePudding user response:

  • .option("dbtable", 'DXHS_FACTURACION_CONSUMOS') translates to select * from DXHS_FACTURACION_CONSUMOS.
  • .option("query", query2) executes query2 on DB.

Hence the first one is slower and second one is much faster, probably because of the where clause.

You're not using anything specific to Spark here. If you do want to read large amount of data faster then use partitionColumn to make Spark run multiple select queries in parallel. E.g. you might be able to use year as the partitionColumn if you have it.

CodePudding user response:

Update: just realized you've a where condition as well in your Query, but same is not there in former. This means you're effectively doing select * there, without any filter condition.

As for columns,

Probably you're using Spark Version below 3 and expecting Spark to take care of selecting only required columns. Spark 2.4 had predicate filters pushdown but lacked projection pushdown. Let me know if this is not the case, I can help you investigate

Predicate Pushdown refers to where, filter, IN, like etc clauses which affects the number of rows returned. It basically is row based filtering.

As opposed to this, Spark 3 introduced Projection Pushdown which affects the number of columns returned. Thus this is column based filtering.

Thus in you code, with the query approach you're explicitly defining the query (and hence columns) to be used for reading. However when specifying just the table Spark will read the all the columns (i.e. No column pruning) even if later only specefic columns are used. You can check this behavior using .explain()

From Spark 3, Spark takes care of doing the same.

See : What is the difference between "predicate pushdown" and "projection pushdown"?

  • Related