I want to get list of all read/write queries that have been made (using dataset API) in the current spark job. For example,
Dataset<Row> readDataFrame = spark.read()
.format("jdbc")
.option("url", drivingUrl)
.option("dbtable", "Select * from A where country_code='US'")
.option("driver", driver)
.load();
I expect to capture the query: Select * from A where country_code='US'
. I tried using listeners for this so that I can capture this info for any spark-submit job I am running without having to alter the main code itself.
What I tried
- QueryExecutionListener
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
SparkPlan sparkPlan = qe.executedPlan();
//Tried to search the methods/properties inside it, but couldn't find anything
}
I tried finding in the SQLMetrics, child spark plans etc, but couldn't get the info I'm searching for.
- SparkListenerSQLExecutionStart
@Override
public void onOtherEvent(SparkListenerEvent event) {
if (event instanceof SparkListenerSQLExecutionStart) {
SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) event;
SparkPlanInfo sparkPlanInfo = sparkListenerSQLExecutionStart.sparkPlanInfo();
System.out.println(sparkListenerSQLExecutionStart.description());
System.out.println(sparkListenerSQLExecutionStart.details());
System.out.println(sparkListenerSQLExecutionStart.physicalPlanDescription());
}
Here also, these details(and other I looked) didn't had the query info I was looking for.
I believe it's possible to capture this info as I have seen projects like SparkSplineAgent and questions in StackOverflow like this have it, but I haven't been able to figure out how.
Can anyone please help me out here?
CodePudding user response:
After lot of trial and error, I finally found a way to do the above. In the listener that implements QueryExecutionListener, I added
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
LogicalPlan executedPlan = qe.analyzed();
//maintain a queue to keep track of plans to process
Queue<LogicalPlan> queue = new LinkedList<>();
queue.add(executedPlan);
while (!queue.isEmpty()) {
//get the first plan from queue
LogicalPlan curPlan = queue.remove();
if (curPlan instanceof LogicalRelation) {
LogicalRelation logicalRelation = (LogicalRelation) curPlan;
BaseRelation baseRelation = logicalRelation.relation();
if (baseRelation instanceof JDBCRelation) {
JDBCRelation jdbcRelation = (JDBCRelation) baseRelation;
System.out.println(jdbcRelation.jdbcOptions().table());
}
System.out.println(logicalRelation.relation());
}
//add all child plans to the queue
Iterator<LogicalPlan> childItr = curPlan.children().iterator();
while (childItr.hasNext()) {
LogicalPlan logicalPlan = childItr.next();
queue.add(logicalPlan);
}
}
}
This gave me the desired output of
SELECT * from A where country_code='US'