Airflow的周调度开始日期问题。解决方法是使用Airflow的ScheduleInterval子类。代码示例如下:
from airflow.utils import timezone from airflow.utils.dates import days_ago from airflow.models import DAG
from datetime import timedelta
class WeeklyInterval(subclass=timedelta): def init(self, weeks, start_date=None): super().init(weeks=weeks) self.start_date = start_date
def get_start_date(self, dttm):
if self.start_date:
return self.start_date
else:
return (dttm - (dttm - timezone.datetime(1970,1,4)).days % 7)
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'retries': 1, 'retry_delay': timedelta(minutes=1), }
dag = DAG( 'weekly_dag', default_args=default_args, description='Weekly DAG', schedule_interval=WeeklyInterval(weeks=1) )