I have dags that run the preprocessing to get the spark config from another restful api then actually submit the spark job with different parameters.
Mostly runs fine - however some of the fields are not templated (according to official docs), how can i make this happen?
overall
start >> preprocessing >> spark_submit_task >> end
Task part for SparkSubmitOperator
spark_submit_task = SparkSubmitOperator(
application="/mnt/service.jar",
task_id=dag_id,
conn_id=conn_id,
java_,
total_executor_cores="{{ ti.xcom_pull(task_ids='preprocessing', key='total_executor_cores') }}",
executor_cores="{{ ti.xcom_pull(task_ids='preprocessing', key='executor_cores') }}",
executor_memory="{{ ti.xcom_pull(task_ids='preprocessing', key='executor_memory') }}",
driver_memory="{{ ti.xcom_pull(task_ids='preprocessing', key='driver_memory') }}",
num_executors='1',
name=dag_id,
spark_binary='/usr/local/bin/spark-submit',
application_args=[
"--name",
"{{ ti.xcom_pull(task_ids='preprocessing', key='resource_name') }}"
]
)
CodePudding user response:
I don't know if you searched for the documentation of the specific 1.10.12 version of Airflow. Otherwise, the most updated documentation for that operator is https://airflow.apache.org/docs/apache-airflow-providers-apache-spark/stable/_api/airflow/providers/apache/spark/operators/spark_submit/index.html (there is a SEO problem with the docs, search engines point a lot of times to old versions).
Anyway, you can modify the template fields. Just create a new file called something like mysparksubmit_operator.py
in the folder plugins, within the Airflow directory. Then, copy the code in https://github.com/apache/airflow/blob/main/airflow/providers/apache/spark/operators/spark_submit.py and add as many template fields as you want in the sequence that defines them in the line 75.
Finally, providing that you execute your DAGs from the dags folder witihin Airflow directory, you would be able to import your operator with
from mysparksubmit_operator import SparkSubmitOperator
Also, if you think this can be useful to other people, I encourage you to open a Pull Request in the official repository. You can follow the guidelines in https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst
CodePudding user response:
You can subclass the operator and assign the template_fields
you'd like since they are class attributes. Then use the subclassed operator in your DAG.
Just to note, the name of the field needs to be the name of the instance attribute. For the Apache Spark provider, the instance attributes have an underscore prepended. You just need to check the code first to determine the correct names of the instance attributes.
For example, if you wanted the java_class
arg to be templated, you could do the following:
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator as _SparkSubmitOperator
class SparkSubmitOperator(_SparkSubmitOperator):
template_fields = list(_SparkSubmitOperator.template_fields)
template_fields.append("_java_class")
Of course, you can also explicitly set the entire list of fields you want explicitly.
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator as _SparkSubmitOperator
class SparkSubmitOperator(_SparkSubmitOperator):
template_fields = [
"_application",
"_conf",
"_files",
"_py_files",
"_jars",
"_driver_class_path",
"_packages",
"_exclude_packages",
"_keytab",
"_principal",
"_proxy_user",
"_name",
"_application_args",
"_env_vars",
"_java_class",
]