Airflow支持自定义日历和弹性日历。你可以通过扩展BaseCalendar
类来创建自定义日历,或者使用HolidayBase
类来创建弹性日历。
下面是一个示例代码,展示了如何创建一个自定义日历和一个弹性日历:
from airflow.utils import dates
from airflow.utils.holidays import HolidayBase, register_calendar
class CustomCalendar(dates.BaseCalendar):
def __init__(self):
super().__init__()
def is_working_day(self, dt):
# 在这里实现自定义的工作日逻辑,返回True表示是工作日,返回False表示非工作日
# 你可以根据日期dt来判断是否为工作日
return True
class CustomHoliday(HolidayBase):
def __init__(self, holiday_dates):
self.holiday_dates = holiday_dates
def is_working_day(self, dt):
# 在这里实现弹性日历的工作日逻辑,返回True表示是工作日,返回False表示非工作日
# 你可以根据日期dt和self.holiday_dates来判断是否为工作日
return False
# 注册自定义日历
register_calendar("custom_calendar", CustomCalendar())
# 注册弹性日历
register_calendar("custom_holiday", CustomHoliday(["2022-01-01", "2022-12-25"]))
在Airflow的DAG定义中,你可以使用schedule_interval
参数来指定使用哪个日历。例如:
from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'start_date': datetime(2022, 1, 1),
}
with DAG('custom_calendar_example', default_args=default_args, schedule_interval='0 0 * * *', calendar='custom_calendar') as dag:
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task1 >> task2
在上面的示例中,我们创建了一个名为custom_calendar_example
的DAG,使用了自定义日历custom_calendar
。该DAG的任务将在每天的午夜执行。
类似地,你也可以使用弹性日历custom_holiday
:
with DAG('custom_holiday_example', default_args=default_args, schedule_interval='0 0 * * *', calendar='custom_holiday') as dag:
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task1 >> task2
在上面的示例中,我们创建了一个名为custom_holiday_example
的DAG,使用了弹性日历custom_holiday
。该DAG的任务将在每天的午夜执行,但会跳过自定义的假期日期。