在Airflow中,我们可以使用Python模块 "holidayapi" 来检查假日。以下是一个简单的DAG示例,该DAG在假日期间不运行。
import holidayapi
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
holiday_client = holidayapi.v1('your-holiday-api-key')
holiday_country = 'US'
holiday_year = datetime.now().year
holiday_data = holiday_client.holidays(holiday_year, holiday_country)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 1,
}
dag = DAG(
'holiday_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
def check_holiday():
if holiday_data['holidays']:
return "Skip DAG Execution: Holiday Found!"
t1 = BashOperator(
task_id='sample_task',
bash_command='echo "Hello World!"',
dag=dag
)
t2 = BashOperator(
task_id='check_holiday_task',
bash_command=check_holiday,
dag=dag
)
t2.set_upstream(t1)
在上面的代码中,我们使用了 "holidayapi"模块来获取有关指定国家和当前年份的假期信息。然后我们定义了一个名为 "check_holiday" 的函数,该函数检查当前日期是否是假期。如果返回一个字符串 "跳过DAG执行:发现假日",则表示 DAG 不应该继续执行。
最后,我们创建了两个 BashOperator 任务,“sample_task”和“check_holiday_task”(即定义的“check_holiday”函数)。在 DAG 运行时,先执行“sample_task”,然后再执行“check_holiday_task”。如果 “check_holiday_task” 返回“跳过DAG执行:发现假日”,则 DAG 不会执行。否则,DAG