Home > Software design >  How to pass args to DataprocSubmitJobOperator in Airflow?
How to pass args to DataprocSubmitJobOperator in Airflow?

Time:10-15

I have a Spark job which takes arguments as key value pairs and maps it in code as following:



    val props = Utils.mapArguments(args)

    println(props)
    val gcsFolder = props("gcsPath")
    val testId=props("test.id")
    val bUrl=props("b.url")
    ......
    ......

Earlier, I used to submit the job to dataproc cluster in bash as a shell script:

gcloud dataproc jobs submit spark --async  --project testing-1 \
--cluster test-cluster \
--region us-central1 \
--properties spark.ui.killEnabled=true,\
spark.dynamicAllocation.enabled=true,\
spark.ui.enabled=true, \
--class com.walmart.ei.testClass \
--jars gs://testing-bucket/test-1.1.0.jar \
-- master=yarn \
gcsPath=gs://testing_part/path/ \
test.id=404 \
correlation.id=AUDIT_2 \
b.url=http://testing-v1.net/test/ \

Now with airflow we are trying to submit it with dataproc job submit operator as:


# SPARK JOB CONF
SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["gs://testing-bucket/test-1.1.0.jar"],
        "main_class": "com.walmart.ei.testClass",
        "gcsPath":"gs://testing_part/path/",
        "test.id":"404",
        "correlation.id":"AUDIT_2222",
        "b.url":"http://testing-v1.net/test/",
    }
}

 spark_task = DataprocSubmitJobOperator(
        task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID, gcp_conn_id=CONN_ID
    )

spark_task

But this job is failing and not able to pass the arguments to Spark job.

What is the correct way to pass the arguments in SPARK_JOB?

CodePudding user response:

the job param is a Dict that must be the same form as the protubuf message :class:~google.cloud.dataproc_v1beta2.types.Job (see source code)

You can view the proto message here.

Noting that there is a PR in progress to migrate the operator from v1beta2 to v1.

CodePudding user response:

Found a way to pass the params, since it takes a List, I was able to run it by passing it this way.

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
     "spark_job": {
        "jar_file_uris": ["gs://testing-bucket/test-1.1.0.jar"],
        "main_class": "com.walmart.ei.testClass",
         args": [
        "gcsPath":"gs://testing_part/path/",
        "test.id":"404",
        "correlation.id":"AUDIT_2222",
        "b.url":"http://testing-v1.net/test/"
         ]
    }
}
  • Related