Home > OS >  Airflow DataprocSubmitJobOperator - ValueError: Protocol message Job has no "python_file_uris&q
Airflow DataprocSubmitJobOperator - ValueError: Protocol message Job has no "python_file_uris&q

Time:07-18

I'm using the DataprocSubmitJobOperator on Airflow to schedule pyspark jobs, and when i'm unable to pass pyfiles to the pyspark job

Here is the code i'm using :

DAG

# working - passing jars
PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : ["gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar",
                               'gs://dataproc-spark-jars/bson-4.0.5.jar','gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar','gs://dataproc-spark-jars/mongodb-driver-core-4.0.5.jar',
                               'gs://dataproc-spark-jars/mongodb-driver-sync-4.0.5.jar','gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar','gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar',
                           'gs://dataproc-spark-jars/spark-token-provider-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/htrace-core4-4.1.0-incubating.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/spark-sql-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/hadoop-client-runtime-3.3.1.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/kafka-clients-3.2.0.jar','gs://dataproc-spark-jars/commons-pool2-2.11.1.jar'],
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12','gs://kafka-certs/appstat-anomaly-user.p12','gs://kafka-certs/appstat-agg-user.p12','gs://kafka-certs/alarmblock-user.p12']
        },
        "python_file_uris": ['gs://dagger-mongo/move2mongo_api.zip']
}

path = "gs://dataproc-spark-configs/pip_install.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone="us-east1-b",
    master_machine_type="n1-highmem-8",
    worker_machine_type="n1-highmem-8",
    num_workers=2,
    storage_bucket="dataproc-spark-logs",
    init_actions_uris=[path],
    metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl kafka-python google-cloud-storage pyspark'},
).make()

with models.DAG(
        'Versa-kafka2mongo_api',
        # Continue to run DAG twice per day
        default_args=default_dag_args,
        #schedule_interval='*/10 * * * *',
        schedule_interval=None,
        catchup=False,
        ) as dag:

    # create_dataproc_cluster
    create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        cluster_name=CLUSTER_NAME,
        region=REGION,
        cluster_config=CLUSTER_GENERATOR_CONFIG
    )

    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )

    delete_dataproc_cluster = DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        region=REGION,
        trigger_rule=trigger_rule.TriggerRule.ALL_SUCCESS
    )

   # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_spark >> delete_dataproc_cluster

Here is the python file(call_kafka2mongo_api.py) code :

from move2mongo_api import alarmBlock_api
from pyspark.sql import SparkSession



spark = SparkSession.builder.appName('kafka2mongo_api').getOrCreate()
print(f" spark : {spark}")
spark.sparkContext.addPyFile("move2mongo_api.zip")

cust = ['versa']
for c in cust:
    ap = alarmBlock_api(spark, c)
    ap.readFromKafka()

Pls note : i upload move2mongo_api.zip to storage bucket ( gs://dagger-mongo/move2mongo_api.zip) before running the Airflow job. move2mongo_api.zip - contains python file -> alarmBlock_api.py, which is referenced in job file 'call_kafka2mongo.py'

When i run this workflow, the error i get is shown below :

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 1849, in execute
    job_object = self.hook.submit_job(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 439, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 870, in submit_job
    return client.submit_job(
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataproc_v1/services/job_controller/client.py", line 493, in submit_job
    request = jobs.SubmitJobRequest(request)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 516, in __init__
    pb_value = marshal.to_proto(pb_type, value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 211, in to_proto
    pb_value = rule.to_proto(value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
    return self._descriptor(**value)
ValueError: Protocol message Job has no "python_file_uris" field.
[2022-07-17, 18:07:00 UTC] {taskinstance.py:1279} INFO - Marking task as UP_FOR_RETRY. dag_id=Versa-kafka2mongo_api, task_id=run_dataproc_spark, execution_date=20220717T180647, start_date=20220717T180659, end_date=20220717T180700

What am i doing wrong here ? Any ideas how to debug/fix this ?

tia!

CodePudding user response:

You appear to have a layout issue in PYSPARK_JOB.

You have:

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : []
        },
        "python_file_uris": []
}

You want:

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : []
        "python_file_uris": []
    },
}
  • Related