Home > Software engineering >  How do I pass custom data into the DatabricksRunNowOperator in airflow
How do I pass custom data into the DatabricksRunNowOperator in airflow

Time:08-11

I am trying to create a DAG which uses the DatabricksRunNowOperator to run pyspark. However I'm unable to figure out how I can access the airflow config inside the pyspark script.

parity_check_run = DatabricksRunNowOperator(
    task_id='my_task',
    databricks_conn_id='databricks_default',
    job_id='1837',
    spark_submit_params=["file.py", "pre-defined-param"],
    dag=dag,
)

I've tried accessing it via kwargs but that doesn't seem to be working.

CodePudding user response:

You can use the notebook_params argument as seen in the documentation .

e.g:

job_id=42

notebook_params = {
    "dry-run": "true",
    "oldest-time-to-consider": "1457570074236"
}

notebook_run = DatabricksRunNowOperator(
    job_id=job_id,
    notebook_params=notebook_params,

)

Then you can access the value via dbutils.widgets.get("oldest-time-to-consider") in the PySpark code.

CodePudding user response:

The DatabricksRunNowOperator supports different ways of providing parameters to the existing jobs, depending on how job is defined (doc):

  • notebook_params if you use notebooks - it's a dictionary of the widget name -> value. You can fetch parameters using the dbutils.widgets.get
  • python_params - list of parameters that will be passed to Python task - you can fetch them via sys.argv
  • jar_params - list of parameters that will be passed to Jar task. You can get them as usual for Java/Scala program
  • spark_submit_params - list of parameters that will be passed to the spark-submit
  • Related