I'm running a command in python subprocess.Popen. The below line works fine!
subprocess.Popen(["airflow", "db", "clean", "--verbose", "--clean-before-timestamp", "2022-07-29"])
However, I'm trying to replace the date with a variable to go back in the past from the current date something like this:
subprocess.Popen(["airflow", "db", "clean", "--verbose", "--clean-before-timestamp", "time_diff"])
And the get time_diff like this :
import datetime as DT
today = DT.date.today()
time_diff = today - DT.timedelta(days=30)
try:
p = subprocess.Popen(["airflow", "db", "clean", "--verbose", "--clean-before-timestamp", "time_diff"])
while p.poll() is None:
time.sleep(5)
except Exception:
print(traceback.format_exc(), flush=True)
But the above code gives an error.
command error: argument --clean-before-timestamp: invalid parse value: 'time_diff'
How can I get the date variable there without having to hard code it as I will be running a cronjob on a daily basis.
CodePudding user response:
The @rdas answer can solve your problem, but where you are trying to use the airflow CLI from airflow, I would like to suggest a better method than subprocess
or BashOperator
.
Airflow CLI is developped by python, and it calls python methods which you can call easily from airflow, for example the command airflow db clean
calls the method run_cleanup, which you can call to clean the db without a complex bash commands call, and you will have python exceptions for the problems.
import pendulum
from airflow.utils.db_cleanup import run_cleanup
run_cleanup(clean_before_timestamp=pendulum.today() - pendulum.duration(days=30), verbose=True, confirm=False)
It might sound a little complicated, but it's better for long-term use cases.
CodePudding user response:
Format the datetime object into a string using stftime before passing it to Popen
:
p = subprocess.Popen(["airflow", "db", "clean", "--verbose", "--clean-before-timestamp", time_diff.strftime("%Y-%m-%d")])