可以使用Python datetime库中的datetime模块来动态生成当前时间并将其传递给DAG,在这种情况下,即使DAG多次运行,它也将使用当前时间进行计算。以下是一个示例:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
'example_dag',
schedule_interval="0 0 * * *",
default_args={
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
)
def print_current_time():
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Current time is {current_time}")
task = PythonOperator(
task_id='print_current_time_task',
python_callable=print_current_time,
dag=dag,
)
在上面的示例中,我们使用了datetime.now().strftime('%Y-%m-%d %H:%M:%S')
来获取当前的时间,然后将其传递给我们的任务函数。这将使DAG在每次运行时都使用实时的时间来计算其任务。