I need to run Spark streaming job in Google Dataproc Cluster using Airflow. I learnt that Dataproc jobs can be automatically restarted incase of failures by optional settings as shown here. And it properly works if submit the job through GCP console.
However I don't know where to include the argument max-failures-per-hour
while submitting the Dataproc job through Airflow's DataprocSubmitJobOperator
.
Here is the Airflow DAG code,
spark_submit_properties = {
'spark.dynamicAllocation.enabled': 'true',
# ...
}
job_args = {
"gcs_job": {
"args": [--],
"jar_file_uris": [f"--"],
"main_class": "---",
"properties": spark_submit_properties
}
}
submit_job = {
"reference": {"project_id": v_project_id},
"placement": {"cluster_name": v_cluster_name},
"spark_job": job_args["gcs_job"]
}
spark_job_submit = DataprocSubmitJobOperator(
task_id="XXXX",
job=submit_job,
location="us-central1",
gcp_conn_id=v_conn_id,
project_id=v_project_id
)
I am not even sure if max_failures_per_hour
or max-failures-per-hour
is the proper argument name to pass to DataprocSubmitJobOperator
.
If I add them to job_args
in above code I get the error,
Exception:
Protocol message SparkJob has no "max_failures_per_hour" field.
And if I directly add max_failures_per_hour = 1
to DataprocSubmitJobOperator()
I am getting DAG import error stating max_failures_per_hour
is an invalid argument.
Can someone please help?
CodePudding user response:
There is no parameter max_failures_per_hour
in Airflow's DataprocSubmitJobOperator
.
As you have mentioned, using the console you will be able to pass the parameter max-failures-per-hour
,
when submitting Dataproc job.
For example using gcloud command:
gcloud dataproc jobs submit job type \
--region=region \
--max-failures-per-hour=number
Maximum number of times a job can be restarted per hour is 10.
CodePudding user response:
Found it. You can add it under "scheduling"
config as below,
submit_job = {
"reference": {"project_id": v_project_id},
"placement": {"cluster_name": v_cluster_name},
"scheduling": {"max_failures_per_hour": 5},
"spark_job": job_args["gcs_job"]
}