在Airflow中,BranchDateTimeOperator操作符可以用于判断当前时间是否在特定的时间范围内。默认情况下,它使用>=运算符来比较时间范围的上限和下限。但是,这个特定的比较操作符可以通过operator_kwargs参数来进行自定义设置。下面是一个使用>运算符设置时间范围上限和下限的示例:
from airflow.operators.branch_datetime import BranchDateTimeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 11, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('test_branch_datetime_operator', default_args=default_args, schedule_interval="@once")
task1 = DummyOperator(task_id='task1', dag=dag)
branch_op = BranchDateTimeOperator(
task_id='branch_task',
branch_time=datetime(2021, 11, 2, 10, 0),
follow_task_ids=['task_after_lower_bound', 'task_after_upper_bound'],
operator_kwargs={'lower_bound_operator': '>', 'upper_bound_operator': '>'},
dag=dag)
task_after_lower_bound = DummyOperator(task_id='task_after_lower_bound', dag=dag)
task_after_upper_bound = DummyOperator(task_id='task_after_upper_bound', dag=dag)
task1 >> branch_op >> [task_after_lower_bound, task_after_upper_bound]
在这个示例中,我们使用BranchDateTimeOperator操作符来创建一个分支任务,并将当前时间设置为2021年11月2日早上10点。我们使用了operator_kwargs参数,将lower_bound_operator和upper_bound_operator设置为>,因此下限和上限的判断将使用>运算符而不是默认的>=运算符。
接下来,我们创建了两个分支任务,一个是在下限之后运行的task_after_lower_bound,另一个是在上限之后运行的task_after_upper_bound。在这个示例中,如果当前时间在2021年11月2日早上10点之前,则分支任务将流转到task_after_lower_bound。如果当前时间在2021年11月2日早上10点之后,则分支任务将流转到task_after_upper_bound。