要获取当前dag_run的开始日期,可以使用 Airflow 的上下文变量 execution_date
。下面是一个获取当前 dag_run 开始日期的示例代码:
from airflow.models import DAG, TaskInstance
from datetime import datetime
# 创建 DAG
dag = DAG(
dag_id='example_dag',
start_date=datetime(2022, 1, 1),
schedule_interval='@daily'
)
# 创建任务
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello, World!"',
dag=dag
)
# 获取当前 dag_run 的开始日期
def get_current_dag_run_start_date(**context):
ti = context['task_instance']
dag_run_start_date = ti.get_dagrun().execution_date
print(f"Current dag_run start date: {dag_run_start_date}")
# 创建任务调用获取当前 dag_run 开始日期的方法
task2 = PythonOperator(
task_id='task2',
python_callable=get_current_dag_run_start_date,
provide_context=True,
dag=dag
)
# 设置任务的依赖关系
task2.set_upstream(task1)
在上面的代码中,我们创建了一个 DAG,并定义了两个任务:task1
和 task2
。task2
使用 PythonOperator
调用 get_current_dag_run_start_date
方法来获取当前 dag_run 的开始日期。在方法中,我们通过 context
参数获取当前任务实例 ti
,然后使用 ti.get_dagrun().execution_date
获取当前 dag_run 的开始日期,并进行打印。
在这个示例中,task2
依赖于 task1
,所以当 task1
执行完毕后,task2
将会执行,并输出当前 dag_run 的开始日期。
请确保将 from airflow.models import DAG, TaskInstance
和 from airflow.operators import BashOperator, PythonOperator
添加到你的代码中,并根据实际情况修改 DAG 的相关配置。