Home > Back-end >  Spark - Issue with select statement run multiple times
Spark - Issue with select statement run multiple times

Time:04-13

Small question regarding Spark please.

What I would like to achieve is quite straightforward: I have a Spark cluster with 10 executors which I would like to utilize. I need to run a query selecting 10 rows from the DB.

My expectation is something like: select 10 rows, results are rows 1 2 3 4 5 6 7 8 9 10. Then apply a map operation on each row. Something like executor 1 applies the operation Op to row one of the row, executor 2 applies the operation Op to another row, etc...

Note, my operation OP have proper logging and proper KPI.

Therefore, I went to try this:

 public static void main(String[] args) {
        final String query = "SELECT TOP(10) id, last_name, first_name FROM mytable WHERE ...";
        final SparkSession sparkSession = SparkSession.builder().getOrCreate();
        final Properties   dbConnectionProperties   = new Properties();
        dbConnectionProperties.putAll([...]);

        final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
        topTenDataSet.show();
        
        final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());

        LOGGER.info("the count is expected to be 10 "   topTenDataSetAfterMap.count()   topTenDataSetAfterMap.showString(100000, 1000000, false));
        sparkSession.stop();
    }

With this code, there is a strange outcome.

Both topTenDataSet.show(); and topTenDataSetAfterMap.count() shows 10 rows, happy.

But I look at the logs from the operation Op performOperationWithLogAndKPI I can see much more than 10 logs, much more than 10 metrics. Meaning, I can see executor 1 performing 10 times the operation, but also executor 2 performing 10 times the operation, etc.

It seems like each executor run its own "SELECT TOP(10) from DB" and applies the map function on each dataset.

May I ask: did I made some mistake in the code?

Is my understanding not correct?

How to achieve the expected, querying once, and having each executor apply a function to part of the result set?

Thank you

CodePudding user response:

If you're trying to execute multiple actions on the same Dataset, try to cache it. This way the select top 10 results query should be executed only once:

final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
topTenDataSet.cache();
topTenDataSet.show();
final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());

Further info here

  • Related