Home > Software design >  Passing Restart job arguments while submitting Dataproc job using DataprocSubmitJobOperator
Passing Restart job arguments while submitting Dataproc job using DataprocSubmitJobOperator

Time:11-30

I need to run Spark streaming job in Google Dataproc Cluster using Airflow. I learnt that Dataproc jobs can be automatically restarted incase of failures by optional settings as shown here. And it properly works if submit the job through GCP console.

However I don't know where to include the argument max-failures-per-hour while submitting the Dataproc job through Airflow's DataprocSubmitJobOperator.

Here is the Airflow DAG code,

spark_submit_properties = {
    'spark.dynamicAllocation.enabled': 'true',
    # ...
}

job_args = {
    "gcs_job": {
        "args": [--],
        "jar_file_uris": [f"--"],
        "main_class": "---",
        "properties": spark_submit_properties
    }
}

submit_job = {
    "reference": {"project_id": v_project_id},
    "placement": {"cluster_name": v_cluster_name},
    "spark_job": job_args["gcs_job"]
}


spark_job_submit = DataprocSubmitJobOperator(
        task_id="XXXX",
        job=submit_job,
        location="us-central1",
        gcp_conn_id=v_conn_id,
        project_id=v_project_id
    )

I am not even sure if max_failures_per_hour or max-failures-per-hour is the proper argument name to pass to DataprocSubmitJobOperator.

If I add them to job_args in above code I get the error,

Exception:
Protocol message SparkJob has no "max_failures_per_hour" field.

And if I directly add max_failures_per_hour = 1 to DataprocSubmitJobOperator() I am getting DAG import error stating max_failures_per_hour is an invalid argument.

Can someone please help?

CodePudding user response:

There is no parameter max_failures_per_hour in Airflow's DataprocSubmitJobOperator.

As you have mentioned, using the console you will be able to pass the parameter max-failures-per-hour,
when submitting Dataproc job.

For example using gcloud command:

gcloud dataproc jobs submit job type \
    --region=region \
    --max-failures-per-hour=number

Maximum number of times a job can be restarted per hour is 10.

CodePudding user response:

Found it. You can add it under "scheduling" config as below,

submit_job = {
    "reference": {"project_id": v_project_id},
    "placement": {"cluster_name": v_cluster_name},
    "scheduling": {"max_failures_per_hour": 5},
    "spark_job": job_args["gcs_job"]
}
  • Related