I'm using the DataprocSubmitJobOperator on Airflow to schedule pyspark jobs, and when i'm unable to pass pyfiles to the pyspark job
Here is the code i'm using :
DAG
# working - passing jars
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": PYSPARK_URI,
"jar_file_uris" : ["gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar",
'gs://dataproc-spark-jars/bson-4.0.5.jar','gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar','gs://dataproc-spark-jars/mongodb-driver-core-4.0.5.jar',
'gs://dataproc-spark-jars/mongodb-driver-sync-4.0.5.jar','gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar','gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar',
'gs://dataproc-spark-jars/spark-token-provider-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/htrace-core4-4.1.0-incubating.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/spark-sql-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/hadoop-client-runtime-3.3.1.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/kafka-clients-3.2.0.jar','gs://dataproc-spark-jars/commons-pool2-2.11.1.jar'],
"file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12','gs://kafka-certs/appstat-anomaly-user.p12','gs://kafka-certs/appstat-agg-user.p12','gs://kafka-certs/alarmblock-user.p12']
},
"python_file_uris": ['gs://dagger-mongo/move2mongo_api.zip']
}
path = "gs://dataproc-spark-configs/pip_install.sh"
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
project_id=PROJECT_ID,
zone="us-east1-b",
master_machine_type="n1-highmem-8",
worker_machine_type="n1-highmem-8",
num_workers=2,
storage_bucket="dataproc-spark-logs",
init_actions_uris=[path],
metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl kafka-python google-cloud-storage pyspark'},
).make()
with models.DAG(
'Versa-kafka2mongo_api',
# Continue to run DAG twice per day
default_args=default_dag_args,
#schedule_interval='*/10 * * * *',
schedule_interval=None,
catchup=False,
) as dag:
# create_dataproc_cluster
create_dataproc_cluster = DataprocCreateClusterOperator(
task_id="create_dataproc_cluster",
cluster_name=CLUSTER_NAME,
region=REGION,
cluster_config=CLUSTER_GENERATOR_CONFIG
)
run_dataproc_spark = DataprocSubmitJobOperator(
task_id="run_dataproc_spark",
job=PYSPARK_JOB,
location=REGION,
project_id=PROJECT_ID,
)
delete_dataproc_cluster = DataprocDeleteClusterOperator(
task_id="delete_dataproc_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=trigger_rule.TriggerRule.ALL_SUCCESS
)
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_spark >> delete_dataproc_cluster
Here is the python file(call_kafka2mongo_api.py) code :
from move2mongo_api import alarmBlock_api
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('kafka2mongo_api').getOrCreate()
print(f" spark : {spark}")
spark.sparkContext.addPyFile("move2mongo_api.zip")
cust = ['versa']
for c in cust:
ap = alarmBlock_api(spark, c)
ap.readFromKafka()
Pls note : i upload move2mongo_api.zip to storage bucket ( gs://dagger-mongo/move2mongo_api.zip) before running the Airflow job. move2mongo_api.zip - contains python file -> alarmBlock_api.py, which is referenced in job file 'call_kafka2mongo.py'
When i run this workflow, the error i get is shown below :
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 1849, in execute
job_object = self.hook.submit_job(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 439, in inner_wrapper
return func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 870, in submit_job
return client.submit_job(
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataproc_v1/services/job_controller/client.py", line 493, in submit_job
request = jobs.SubmitJobRequest(request)
File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 516, in __init__
pb_value = marshal.to_proto(pb_type, value)
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 211, in to_proto
pb_value = rule.to_proto(value)
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
return self._descriptor(**value)
ValueError: Protocol message Job has no "python_file_uris" field.
[2022-07-17, 18:07:00 UTC] {taskinstance.py:1279} INFO - Marking task as UP_FOR_RETRY. dag_id=Versa-kafka2mongo_api, task_id=run_dataproc_spark, execution_date=20220717T180647, start_date=20220717T180659, end_date=20220717T180700
What am i doing wrong here ? Any ideas how to debug/fix this ?
tia!
CodePudding user response:
You appear to have a layout issue in PYSPARK_JOB
.
You have:
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": PYSPARK_URI,
"jar_file_uris" : []
},
"python_file_uris": []
}
You want:
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": PYSPARK_URI,
"jar_file_uris" : []
"python_file_uris": []
},
}