在Airflow中,可以使用Jinja模板语言将时间戳声明为变量,并在每个任务中使用不同的时间戳。
以下是一个示例代码,展示了如何在Airflow中使用Jinja模板语言和变量来设置每个任务的时间戳:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def my_task(timestamp, **kwargs):
# 在这里可以使用timestamp变量进行任务处理
print(f"任务的时间戳是:{timestamp}")
# 定义默认参数
default_args = {
'start_date': datetime.datetime(2021, 1, 1),
'retries': 3,
'retry_delay': datetime.timedelta(minutes=5)
}
# 创建DAG对象
dag = DAG(
'timestamp_example',
default_args=default_args,
schedule_interval='@daily'
)
# 创建任务1,使用当前时间戳作为变量
task1 = PythonOperator(
task_id='task1',
python_callable=my_task,
op_kwargs={'timestamp': '{{ execution_date }}'},
dag=dag
)
# 创建任务2,使用昨天的时间戳作为变量
task2 = PythonOperator(
task_id='task2',
python_callable=my_task,
op_kwargs={'timestamp': '{{ prev_execution_date }}'},
dag=dag
)
# 设置任务之间的依赖关系
task1 >> task2
在上述代码中,{{ execution_date }}
表示当前任务的执行时间戳,{{ prev_execution_date }}
表示上一个任务的执行时间戳。通过在op_kwargs
参数中传递这些变量,可以在每个任务中获取到不同的时间戳。
这样,每个任务在执行时都会使用不同的时间戳。可以根据实际需求,使用不同的Jinja模板语法来处理时间戳。