要在目标DAG中使用TriggerDagRunOperator设置execution_date以使用当前execution_date,可以按照以下步骤进行操作:
首先,确保你已经按照文档正确安装和配置了Apache Airflow。
在目标DAG文件中,导入所需的模块和操作符:
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime
datetime.now().strftime('%Y-%m-%d')
来实现:def get_current_execution_date(**context):
return datetime.now().strftime('%Y-%m-%d')
dag = DAG(
'target_dag',
schedule_interval='0 0 * * *', # 每天午夜运行一次
default_args={
'owner': 'airflow',
'start_date': datetime(2022, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
)
provide_context=True
传递给TriggerDagRunOperator,以便可以在get_current_execution_date函数中访问上下文:trigger_dag_run = TriggerDagRunOperator(
task_id='trigger_dag_run',
trigger_dag_id='target_dag',
execution_date="{{ ti.xcom_pull(task_ids='get_execution_date') }}",
provide_context=True,
dag=dag
)
get_current_execution_date
函数作为PythonOperator的参数传递,并将任务的task_id设置为get_execution_date
:get_execution_date = PythonOperator(
task_id='get_execution_date',
python_callable=get_current_execution_date,
provide_context=True,
dag=dag
)
get_execution_date
任务作为trigger_dag_run
任务的依赖项:get_execution_date >> trigger_dag_run
trigger_dag_run
任务将使用当前的execution_date来触发目标DAG的运行。请注意,如果你希望在目标DAG中使用当前execution_date,你还需要在目标DAG的任务中使用provide_context=True
,以便可以从上下文中获取execution_date。