Small question regarding Spark please.
What I would like to achieve is quite straightforward: I have a Spark cluster with 10 executors which I would like to utilize. I need to run a query selecting 10 rows from the DB.
My expectation is something like: select 10 rows, results are rows 1 2 3 4 5 6 7 8 9 10. Then apply a map operation on each row. Something like executor 1 applies the operation Op to row one of the row, executor 2 applies the operation Op to another row, etc...
Note, my operation OP have proper logging and proper KPI.
Therefore, I went to try this:
public static void main(String[] args) {
final String query = "SELECT TOP(10) id, last_name, first_name FROM mytable WHERE ...";
final SparkSession sparkSession = SparkSession.builder().getOrCreate();
final Properties dbConnectionProperties = new Properties();
dbConnectionProperties.putAll([...]);
final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
topTenDataSet.show();
final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());
LOGGER.info("the count is expected to be 10 " topTenDataSetAfterMap.count() topTenDataSetAfterMap.showString(100000, 1000000, false));
sparkSession.stop();
}
With this code, there is a strange outcome.
Both topTenDataSet.show();
and topTenDataSetAfterMap.count()
shows 10 rows, happy.
But I look at the logs from the operation Op performOperationWithLogAndKPI
I can see much more than 10 logs, much more than 10 metrics. Meaning, I can see executor 1 performing 10 times the operation, but also executor 2 performing 10 times the operation, etc.
It seems like each executor run its own "SELECT TOP(10) from DB" and applies the map function on each dataset.
May I ask: did I made some mistake in the code?
Is my understanding not correct?
How to achieve the expected, querying once, and having each executor apply a function to part of the result set?
Thank you
CodePudding user response:
If you're trying to execute multiple actions on the same Dataset, try to cache it. This way the select top 10 results query should be executed only once:
final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
topTenDataSet.cache();
topTenDataSet.show();
final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());
Further info here