I have my Spark pipeline written in Scala and Airflow DAG written in Python.
I am working on a feature to run a task, when dag trigger date is equal to a run_date in table, otherwise skip it. My plan is to use ShortCircuitOperator for this.
I have written a function in Scala to fetch that date from hive table. I want to call this function from my Python DAG file so that I get date value and use it in ShortCircuitOperator. So I am looking a way to call my Scala function from Python DAG file. Also please suggest if there is any better way to do this.
skip_if_not_run_date = ShortCircuitOperator(
task_id='skip_if_not_run_date',
python_callable=getLatestRunDate, #need to call scala function here
dag=dag,
provide_context=True,
)
Spark Scala function that I need to call
def getLatestRunDate(df: DataFrame): DataFrame = {
val df = spark.table("my_hive_schema.my_run_catalog_table")
df
.filter(col("date_of_job_run").leq(current_date())
and col("month_nbr").geq(month(current_date())-2)
and col("yr_nbr").equalTo(year()))
.select("date_of_job_run")
}
CodePudding user response:
I suggest splitting this into two tasks. In the first task, call your Scala function and return the value using XCOM. The second task can be the short circuit operator and can receive the XCOM value from the first task as an argument.